Reputation: 9521
Problem:
I want to repeatedly take some batches from the fs2.Stream
provided by some third-party library and therefore abstract clients away from the fs2.Stream
itself and give them simply F[List[Int]]
batches as soon as they are ready.
Attempts:
I tried to use fs2.Stream::take
and ran some examples.
I.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst))).iterateWhile(_.nonEmpty)
} yield ()
r.unsafeRunSync()
It prints the very first batch List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
and then hangs. I expected that all the batches from 0
to 1000
will be printed.
Keeping things a bit simpler here is
II.
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val r = for {
queue <- fs2.concurrent.Queue.unbounded[IO, Int]
stream = queue.dequeue
_ <- fs2.Stream.range(0, 1000).covaryAll[IO, Int].evalTap(queue.enqueue1).compile.drain
_ <- stream.take(10).compile.toList.flatTap(lst => IO(println(lst)))
_ <- stream.take(20).compile.toList.flatTap(lst => IO(println(lst)))
} yield ()
r.unsafeRunSync()
The behavior is completely the same to I. Prints List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
and then hangs.
Question:
Given an fs2.Stream[IO, Int]
how to provide an effect IO[List[Int]]
which iterates through consecutive batches provided by the stream when evaluated?
Upvotes: 0
Views: 370
Reputation: 22840
Well, you can not have an IO[List[X]]
that represents multiple batches, that IO
would be a single batch.
The best you can do is something like this:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit]
That is, your users will give you an operation to execute for each batch and you would give them an IO
that will block the current fiber until the whole stream was consumed using that function.
And the simples way to implement such function would be:
def processByBatches(process: List[Int] => IO[Unit]): IO[Unit] =
getStreamFromThirdParty
.chunkN(n = ChunkSize)
.evalMap(chunk => process(chunk.toList))
.compile
.drain
Upvotes: 5