Reputation: 9521
I have an fs2.Stream
consisting of some elements (probably infinite) and I want to schedule some computation for all elements of the stream concurrently to each other. Here is what I tried
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val stream = for {
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
_ <- fs2.Stream.awakeEvery[IO](1.second)
_ <- fs2.Stream.eval(IO(println(id)))
} yield ()
stream.compile.drain.unsafeRunSync()
The program output looks like
1
1
1
etc...
which is not what's expected. I'd like to interleave the scheduled computation for all of the elements of the original stream, but not wait until the first stream terminates (which never happens due to the infinite scheduling).
Upvotes: 2
Views: 2361
Reputation: 3390
val str = for {
id <- Stream.emits(List(1, 5, 7)).covary[IO]
res = timer.sleep(id.second) >> IO(println(id))
} yield res
val stream = str.parEvalMapUnordered(5)(identity)
stream.compile.drain.unsafeRunSync()
or
val stream = Stream.emits(List(1, 5, 7))
.map { id =>
Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
.parJoinUnbounded
stream.compile.drain.unsafeRunSync()
Upvotes: 1
Reputation: 9521
Accroding to hints given by @KrzysztofAtłasik and @LuisMiguelMejíaSuárez here is the solution I just came up with:
val originalStream = fs2.Stream.emits(List(1, 2))
val scheduledComputation = originalStream.covary[IO].map({ id =>
fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
The solution that @KrzysztofAtłasik proposed in the comment with interleaving
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
and _ <- fs2.Stream.awakeEvery[IO](1.second)
also works, but it does not allow to schedule each element in its own way.
To schedule elements concurrently for elementValue
seconds it is possible to do the following:
val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
//id.seconds
fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
Upvotes: 1