Reputation: 43
I have an interesting instance of the producer–consumer problem in Scala written in two different ways. Both ways use java.util.concurrent.ArrayBlockingQueue
as the buffer but they have the following differences:
Version 1
starts the Producer and Consumer code as regular Java
threadsVersion 2
starts the Producer and Consumer code using
ExecutionContext.global
After executing both versions it can be clearly seen that Version1
never finishes execution (because the number of consumers exceeds the number of producers, so at least one consumer keeps waiting indefinitely for new items to consume).
However, the same is also the case in Version2
i.e. the number of consumers also exceeds the number of producers, yet this time the program does finish its execution - the thing I'm asking for in this question is an explanation of why this is the case.
What I'd say is that even though the consumers in Version2
also keep waiting for new items after the supply from producers "dries up", the fact that the code is started within ExecutionContext
means the code is run inside daemon threads, so after the main thread completes (here after artificially adding the Thread.sleep(1000)
to prolong the execution), the daemon threads are stopped as well, regardless of whether they finished their main work or they keep waiting.
However, I'm not sure if my explanation addresses the root of the problem and that it's not tangential. In other words, I'm not sure if I'm not stating something trivial but miss some obvious explanation of the problem. Could you please verify whether my general understanding is correct and if necessary help me find the proper explanation for this behavior of the two versions of the code? Thanks!
object Version1 extends App {
class Producer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (i <- 1 to 10) {println(s"$getName produced $i"); buffer.put(i)}
}
class Consumer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (_ <- 1 to 10) println(s"$getName consumed ${buffer.take}")
}
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
for (i <- 1 to 2) new Producer(s"Producer$i", buffer).start()
for (i <- 1 to 3) new Consumer(s"Consumer$i", buffer).start()
}
object Version2 extends App {
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
val ec = ExecutionContext.global
for (p <- 1 to 2)
ec.execute(() => for (i <- 1 to 10) {println(s"Producer$p produced $i"); buffer.put(i)})
for (c <- 1 to 3)
ec.execute(() => for (_ <- 1 to 10) {println(s"Consumer$c consumed ${buffer.take}")})
Thread.sleep(1000)
}
Upvotes: 1
Views: 178
Reputation: 48420
scala.concurrent.ExecutionContext.global
by-default creates daemon Thread
s, whilst java.lang.Thread
by default are non-daemonic. You could make Version1
app have JVM terminate after main thread exits by calling setDaemon(true)
in the constructor of Producer
and Consumer
As a side note, in Scala there is no need to directly call execute
or manually create and manage java.lang.Thread
s. Instead we define implicit ExecutionContext
and instead of creating Runnable
s we pass in by-name a task argument to apply
factory method of scala.concurrent.Future
. Then ExecutionContext
is passed to Future
s implicitly which behind the scenes manages Thread
s for us and decided on which Thread
to execute our action.
Upvotes: 2