Nicolas Rinaudo
Nicolas Rinaudo

Reputation: 6168

Asynchronous "node" in a scalaz-stream

I have a Process[Task, A], and I need to run a function A => B whose run time ranges from instantaneous to really long on each A of the stream to yield a Process[Task, B].

The catch is that I'd like to process each A as soon as possible in an ExecutionContext and pass the result as soon as I have it, regardless of the order in which the As are received.

A concrete example would be the following code, where I would hope all the odd numbers to be printed immediately and the even ones about 500ms later. What happens instead is that (odd, even) couples are printed, interleaved with 500ms pauses:

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))

  Process.range(0, 100).flatMap { i =>
    Process.eval(Task.apply {
      if(i % 2 == 0) Thread.sleep(500)
      i
    }(executor))
  }.to(io.printStreamSink(System.out)(_ println _))
  .run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}

Upvotes: 1

Views: 43

Answers (1)

Nicolas Rinaudo
Nicolas Rinaudo

Reputation: 6168

Turns out the answer is using channels. Here's the updated code that seems to do exactly what I want:

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
  val chan = channel.lift[Task, Int, Int] { i => Task {
    if(i % 2 == 0) Thread.sleep(500)
    i
  }}

  merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
    .to(io.printStreamSink(System.out)(_ println _)).run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}

Upvotes: 1

Related Questions