LLCampos
LLCampos

Reputation: 325

How to enqueue elements to a queue and then dequeue them?

Imports and implicits for the following snippets:

import cats.effect.{ConcurrentEffect, ContextShift, IO, Timer}
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val c: ConcurrentEffect[IO] = IO.ioConcurrentEffect
implicit val t: Timer[IO] = IO.timer(ec)

The goal is to add the integers 1 to 3 to a fs2 Queue and then dequeue them. The first attempt:

val s1 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
  _ <- Stream(1,2,3).map(Some(_)).through(q.enqueue) // Enqueue 1 to 3
  _ <- Stream.eval(q.enqueue1(None)) // Terminate queue
  outStream <- q.dequeue // Get dequeue stream
} yield outStream

s1.compile.toList.unsafeRunSync()

This returns List(1). Not sure why.

Second attempt:

val s2 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
  _ <- Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), // Enqueue 1 to 3
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) // Wait 1 second and terminate stream
  ).parJoin(2) // Run both streams in parallel
  outStream <- q.dequeue // Get dequeue stream
} yield outStream 

s2.compile.toList.unsafeRunSync()

This returns List(1,2). Also not sure why.

Why these examples returned what they returned? What's the proper way to do it?

Upvotes: 1

Views: 981

Answers (1)

Mateusz Kubuszok
Mateusz Kubuszok

Reputation: 27535

Take a look what you actually defined here:

val s1 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int])
  _ <- Stream(1,2,3).map(Some(_)).through(q.enqueue)
  _ <- Stream.eval(q.enqueue1(None))
  outStream <- q.dequeue // Get dequeue stream
} yield outStream

this is the same as

Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>

  Stream(1,2,3).map(Some(_)).through(q.enqueue).flatMap { _ =>

    Stream.eval(q.enqueue1(None)).flatMap { _ =>
      q.dequeue
    }
  }
}
  • you create a queue in Stream
  • then for each element of this stream (for each Queue) you enqueue 3 Some elements - lazily meaning that until this element is needed the side-effect is not evaluated
  • then for for each enque you also put None into queue
  • you basically created a stream of Some(1), None, Some(2), None, Some(3), None before you got to the dequeue part! And since it is none terminated Queue it stops after the first None, so you end up with Stream(1) evaluated to List(1)

Meanwhile with second example you have

val s2 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) 
  _ <- Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), 
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) 
  ).parJoin(2) 
  outStream <- q.dequeue 
} yield outStream 

which is equal to

Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>

  Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), 
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) 
  ).parJoin(2).flatMap { _ =>
    q.dequeue 
  }
}
  • you create a queue in Stream
  • then for each element of this stream (for each Queue) you create a smaller Stream
    • you are enqueuing Some 1, 2, 3 lazily in one stream
    • you enqueue None an another
    • you non-deterministically combine both streams, so you don't have a strong guarantees when sequence of Some will be interrupted by None, in your tests the circumstances made it so that it happened after evaluating 2 elements of the first stream

You basic error is using Stream[IO, *] and IO interchangably: when you make a flatMap on Stream the next line is NOT evaluated after all elements of the previous line has been evaluated, but after every single element of the previous line has been evaluated. After all Stream is (lazy, side-effecting) collection so in for-comprehension it behaves more like a (Lazy)List when it comes to order of operations.

If you didn't evaluate everything is Stream it would work as you expect:

val createQueue: IO[NoneTerminatedQueue[IO, Int]] = Queue.noneTerminated[IO, Int]

def enqueueValues(queue: NoneTerminatedQueue[IO, Int]) =
  Stream(1,2,3).map(Some(_)).through(queue.enqueue) ++ Stream.eval(queue.enqueue1(None))

def dequeueValues(queue: NoneTerminatedQueue[IO, Int]) =
  queue.dequeue

// these are IO, not Stream[IO, *] !!!
val io = for {
  queue <- createQueue
  _ <- enqueueValues(queue).compile.drain.start
  result <- dequeueValues(queue).compile.toList
} yield result

io.runUnsafeSync

Upvotes: 2

Related Questions