user24139
user24139

Reputation: 43

Why does using ExecutionContext causes this code not to halt?

The problem

I have an interesting instance of the producer–consumer problem in Scala written in two different ways. Both ways use java.util.concurrent.ArrayBlockingQueue as the buffer but they have the following differences:

After executing both versions it can be clearly seen that Version1 never finishes execution (because the number of consumers exceeds the number of producers, so at least one consumer keeps waiting indefinitely for new items to consume).

However, the same is also the case in Version2 i.e. the number of consumers also exceeds the number of producers, yet this time the program does finish its execution - the thing I'm asking for in this question is an explanation of why this is the case.

My attempt at the explanation

What I'd say is that even though the consumers in Version2 also keep waiting for new items after the supply from producers "dries up", the fact that the code is started within ExecutionContext means the code is run inside daemon threads, so after the main thread completes (here after artificially adding the Thread.sleep(1000) to prolong the execution), the daemon threads are stopped as well, regardless of whether they finished their main work or they keep waiting.

However, I'm not sure if my explanation addresses the root of the problem and that it's not tangential. In other words, I'm not sure if I'm not stating something trivial but miss some obvious explanation of the problem. Could you please verify whether my general understanding is correct and if necessary help me find the proper explanation for this behavior of the two versions of the code? Thanks!

Code

object Version1 extends App {

  class Producer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
    override def run(): Unit =
      for (i <- 1 to 10) {println(s"$getName produced $i"); buffer.put(i)}
  }

  class Consumer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
    override def run(): Unit =
      for (_ <- 1 to 10) println(s"$getName consumed ${buffer.take}")
  }

  val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
  for (i <- 1 to 2) new Producer(s"Producer$i", buffer).start()
  for (i <- 1 to 3) new Consumer(s"Consumer$i", buffer).start()

}


object Version2 extends App {

  val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
  val ec = ExecutionContext.global

  for (p <- 1 to 2)
    ec.execute(() => for (i <- 1 to 10) {println(s"Producer$p produced $i"); buffer.put(i)})

  for (c <- 1 to 3)
    ec.execute(() => for (_ <- 1 to 10) {println(s"Consumer$c consumed ${buffer.take}")})

  Thread.sleep(1000)

}

Upvotes: 1

Views: 178

Answers (1)

Mario Galic
Mario Galic

Reputation: 48420

scala.concurrent.ExecutionContext.global by-default creates daemon Threads, whilst java.lang.Thread by default are non-daemonic. You could make Version1 app have JVM terminate after main thread exits by calling setDaemon(true) in the constructor of Producer and Consumer

As a side note, in Scala there is no need to directly call execute or manually create and manage java.lang.Threads. Instead we define implicit ExecutionContext and instead of creating Runnables we pass in by-name a task argument to apply factory method of scala.concurrent.Future. Then ExecutionContext is passed to Futures implicitly which behind the scenes manages Threads for us and decided on which Thread to execute our action.

Upvotes: 2

Related Questions