Reputation: 325
Imports and implicits for the following snippets:
import cats.effect.{ConcurrentEffect, ContextShift, IO, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val c: ConcurrentEffect[IO] = IO.ioConcurrentEffect
implicit val t: Timer[IO] = IO.timer(ec)
The goal is to add the integers 1 to 3 to a fs2 Queue
and then dequeue them. The first attempt:
val s1 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
_ <- Stream(1,2,3).map(Some(_)).through(q.enqueue) // Enqueue 1 to 3
_ <- Stream.eval(q.enqueue1(None)) // Terminate queue
outStream <- q.dequeue // Get dequeue stream
} yield outStream
s1.compile.toList.unsafeRunSync()
This returns List(1)
. Not sure why.
Second attempt:
val s2 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
_ <- Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue), // Enqueue 1 to 3
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) // Wait 1 second and terminate stream
).parJoin(2) // Run both streams in parallel
outStream <- q.dequeue // Get dequeue stream
} yield outStream
s2.compile.toList.unsafeRunSync()
This returns List(1,2)
. Also not sure why.
Why these examples returned what they returned? What's the proper way to do it?
Upvotes: 1
Views: 981
Reputation: 27535
Take a look what you actually defined here:
val s1 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int])
_ <- Stream(1,2,3).map(Some(_)).through(q.enqueue)
_ <- Stream.eval(q.enqueue1(None))
outStream <- q.dequeue // Get dequeue stream
} yield outStream
this is the same as
Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
Stream(1,2,3).map(Some(_)).through(q.enqueue).flatMap { _ =>
Stream.eval(q.enqueue1(None)).flatMap { _ =>
q.dequeue
}
}
}
Stream
Queue
) you enqueue 3 Some
elements - lazily meaning that until this element is needed the side-effect is not evaluatedNone
into queueSome(1), None, Some(2), None, Some(3), None
before you got to the dequeue part! And since it is none terminated Queue it stops after the first None
, so you end up with Stream(1)
evaluated to List(1)
Meanwhile with second example you have
val s2 = for {
q <- Stream.eval(Queue.noneTerminated[IO, Int])
_ <- Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue),
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None))
).parJoin(2)
outStream <- q.dequeue
} yield outStream
which is equal to
Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
Stream(
Stream(1,2,3).map(Some(_)).through(q.enqueue),
Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None))
).parJoin(2).flatMap { _ =>
q.dequeue
}
}
Stream
Queue
) you create a smaller Stream
Some
1, 2, 3 lazily in one streamNone
an anotherSome
will be interrupted by None
, in your tests the circumstances made it so that it happened after evaluating 2 elements of the first streamYou basic error is using Stream[IO, *]
and IO
interchangably: when you make a flatMap
on Stream
the next line is NOT evaluated after all elements of the previous line has been evaluated, but after every single element of the previous line has been evaluated. After all Stream
is (lazy, side-effecting) collection so in for-comprehension it behaves more like a (Lazy
)List
when it comes to order of operations.
If you didn't evaluate everything is Stream
it would work as you expect:
val createQueue: IO[NoneTerminatedQueue[IO, Int]] = Queue.noneTerminated[IO, Int]
def enqueueValues(queue: NoneTerminatedQueue[IO, Int]) =
Stream(1,2,3).map(Some(_)).through(queue.enqueue) ++ Stream.eval(queue.enqueue1(None))
def dequeueValues(queue: NoneTerminatedQueue[IO, Int]) =
queue.dequeue
// these are IO, not Stream[IO, *] !!!
val io = for {
queue <- createQueue
_ <- enqueueValues(queue).compile.drain.start
result <- dequeueValues(queue).compile.toList
} yield result
io.runUnsafeSync
Upvotes: 2