Reputation: 6168
I have a Process[Task, A]
, and I need to run a function A => B
whose run time ranges from instantaneous to really long on each A
of the stream to yield a Process[Task, B]
.
The catch is that I'd like to process each A
as soon as possible in an ExecutionContext
and pass the result as soon as I have it, regardless of the order in which the A
s are received.
A concrete example would be the following code, where I would hope all the odd numbers to be printed immediately and the even ones about 500ms later. What happens instead is that (odd, even) couples are printed, interleaved with 500ms pauses:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
Process.range(0, 100).flatMap { i =>
Process.eval(Task.apply {
if(i % 2 == 0) Thread.sleep(500)
i
}(executor))
}.to(io.printStreamSink(System.out)(_ println _))
.run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}
Upvotes: 1
Views: 43
Reputation: 6168
Turns out the answer is using channels. Here's the updated code that seems to do exactly what I want:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
val chan = channel.lift[Task, Int, Int] { i => Task {
if(i % 2 == 0) Thread.sleep(500)
i
}}
merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
.to(io.printStreamSink(System.out)(_ println _)).run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}
Upvotes: 1