Executors and Guava
// Limit 2 running threads at a time in an executor
val queue = new SynchronousQueue[Runnable]
val executor: ThreadPoolExecutor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.MILLISECONDS, queue, factory, new RejectedExecutionHandler {
override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
println(s"$delta $thread rejected!" + r)
executor.getQueue.put(r) // Try again!
}
})
(1 to 4).foreach(i => executor.submit(run(i)))
/*
2 at 447, 2 at 757 (that were rejected at first)
138 main rejected!java.util.concurrent.FutureTask@531be3c5
447 | 2 running! t-1
447 main rejected!java.util.concurrent.FutureTask@26f67b76
447 | 1 running! t-0
447 | WAITING ...
757 | 3 running! t-1
758 | 4 running! t-0
1447 | END!
With (2, 2, ..) only 1 rejected:
116 main rejected!java.util.concurrent.FutureTask@531be3c5
425 | 2 running! t-1
425 | 1 running! t-0
425 | WAITING ...
735 | 3 running! t-0
735 | 4 running! t-1
*/
val scheduledPool = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(5, factory))
scheduledPool.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
println(s"$delta $thread waiting for a value...")
println(s"$delta $thread got ${syncQueue.poll()}")
} }, 0, 200, TimeUnit.MILLISECONDS)
val syncQueue = new SynchronousQueue[Int]()
// to have more than 1 blocking value: new ArrayBlockingQueue[Int](2)
println(s"$delta $thread --> putting 1...")
syncQueue.put(1)
println(s"$delta $thread --> putting 2...")
syncQueue.put(2)
println(s"$delta $thread --> putting 3...")
syncQueue.put(3)
println(s"$delta | WAITING ...")
Thread.sleep(1000)
println(s"$delta | END!")
scheduledPool.shutdown()
/*
129 t-0 waiting for a value...
129 main --> putting 1...
133 t-0 got null
240 t-0 waiting for a value...
240 main --> putting 2...
240 t-0 got 1
439 t-1 waiting for a value...
439 main --> putting 3...
439 t-1 got 2
640 t-0 waiting for a value...
640 t-0 got 3
640 | WAITING ...
839 t-2 waiting for a value...
839 t-2 got null
*/
// We can shutdown a pool, in this case, we can't submit more tasks
scheduledPool.shutdown()
try {
scheduledPool.scheduleAtFixedRate(run("---"), 0, 100, TimeUnit.MILLISECONDS)
// Too late! Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@1efbd816 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6a2bcfcb[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 7]
} catch {
case e: RejectedExecutionException => println("Too late! " + e.getMessage)
}
// We can create static Futures
val fu: ListenableFuture[Int] = Futures.immediateFuture(2)
fu.addListener(new Runnable { override def run() = println("immediate done!") }, scheduledPool)
// Transform
val trans = Futures.transform[Int, Boolean](fu, new Function[Int, Boolean] {
override def apply(input: Int) = {
println(s"$delta | transform running! ${Thread.currentThread().getName}")
if (input > 0) true else false
}
}).get(100, TimeUnit.MILLISECONDS)
println(s"$delta | After Transform: $trans")
package com.example
import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.{Executors, ScheduledFuture, ThreadFactory, TimeUnit}
import com.google.common.util.concurrent.{ListenableScheduledFuture, ListeningScheduledExecutorService, MoreExecutors, ThreadFactoryBuilder}
//
// ListenableFutures from Guava allows you to .addListener a Future (triggered when it's done)
// - Any ExecutorService can be converted (it's mostly just a proxy)
object Main extends App {
val start = System.currentTimeMillis()
def delta = System.currentTimeMillis() - start
val factory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("coucou-%d")
.setPriority(5)
.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
println(s"ERR: ${t.getName}:${e.getMessage}")
}
})
.build()
def run[A](foo: A) = new Runnable {
override def run() = {
Thread.sleep(310)
println(s"$delta | $foo running! ${Thread.currentThread().getName}")
}
}
def createThread(factory: ThreadFactory, foo: Int) = {
factory.newThread(run(foo))
}
val scheduledPool = Executors.newScheduledThreadPool(5, factory)
val lses: ListeningScheduledExecutorService = MoreExecutors.listeningDecorator(scheduledPool)
println(s"GO !")
val future: ScheduledFuture[_] = scheduledPool.scheduleAtFixedRate(run("---"), 0, 100, TimeUnit.MILLISECONDS)
val future2: ListenableScheduledFuture[_] = lses.scheduleAtFixedRate(run("+"), 0, 100, TimeUnit.MILLISECONDS)
future2.addListener(new Runnable { override def run(): Unit = println("I'm done!") }, scheduledPool)
println(s"$delta | WAITING ...")
Thread.sleep(1000)
println(s"$delta | END!")
future.cancel(false)
future2.cancel(false) // If you don't cancel(), the listener will never be called if the program exits
println(s"$delta | END END!")
}
/*
GO !
148 | WAITING ...
447 | --- running! coucou-0
457 | + running! coucou-1
757 | --- running! coucou-0
767 | + running! coucou-2
1068 | --- running! coucou-1
1077 | + running! coucou-2
1148 | END!
1149 | END END!
I'm done!
*/
object Main extends App {
val start = System.currentTimeMillis()
def delta = System.currentTimeMillis() - start
val factory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("coucou-%d")
.setPriority(5)
.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
println(s"ERR: ${t.getName}:${e.getMessage}")
}
})
.build()
def run(foo: Int) = new Runnable {
override def run() = {
println(s"$delta | $foo running! ${Thread.currentThread().getName}")
}
}
def createThread(factory: ThreadFactory, foo: Int) = {
factory.newThread(run(foo))
}
val scheduledPool = Executors.newScheduledThreadPool(5, factory)
println(s"GO !")
scheduledPool.scheduleAtFixedRate(run(1), 0, 100, TimeUnit.MILLISECONDS)
println(s"$delta | WAITING ...")
Thread.sleep(1000)
println(s"$delta | END!")
}
/*
GO !
131 | WAITING ...
132 | 1 running! coucou-0
232 | 1 running! coucou-0
332 | 1 running! coucou-1
431 | 1 running! coucou-0
531 | 1 running! coucou-2
631 | 1 running! coucou-1
731 | 1 running! coucou-1
830 | 1 running! coucou-1
931 | 1 running! coucou-1
1032 | 1 running! coucou-1
1131 | 1 running! coucou-1
1132 | END!
*/