Some Name
Some Name

Reputation: 9521

Schedule computation concurrently for all elements of the fs2.Stream

I have an fs2.Stream consisting of some elements (probably infinite) and I want to schedule some computation for all elements of the stream concurrently to each other. Here is what I tried

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)

val stream = for {
  id <- fs2.Stream.emits(List(1, 2)).covary[IO]
  _ <- fs2.Stream.awakeEvery[IO](1.second)
  _ <- fs2.Stream.eval(IO(println(id)))
} yield ()

stream.compile.drain.unsafeRunSync()

The program output looks like

1
1
1
etc...

which is not what's expected. I'd like to interleave the scheduled computation for all of the elements of the original stream, but not wait until the first stream terminates (which never happens due to the infinite scheduling).

Upvotes: 2

Views: 2361

Answers (2)

Bogdan Vakulenko
Bogdan Vakulenko

Reputation: 3390

val str = for {
  id <- Stream.emits(List(1, 5, 7)).covary[IO]
  res = timer.sleep(id.second) >> IO(println(id))
} yield res

val stream =  str.parEvalMapUnordered(5)(identity)

stream.compile.drain.unsafeRunSync()

or

 val stream = Stream.emits(List(1, 5, 7))
   .map { id => 
     Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
   .parJoinUnbounded

stream.compile.drain.unsafeRunSync()

Upvotes: 1

Some Name
Some Name

Reputation: 9521

Accroding to hints given by @KrzysztofAtłasik and @LuisMiguelMejíaSuárez here is the solution I just came up with:

val originalStream = fs2.Stream.emits(List(1, 2))

val scheduledComputation = originalStream.covary[IO].map({ id =>
        fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

The solution that @KrzysztofAtłasik proposed in the comment with interleaving id <- fs2.Stream.emits(List(1, 2)).covary[IO] and _ <- fs2.Stream.awakeEvery[IO](1.second) also works, but it does not allow to schedule each element in its own way.

To schedule elements concurrently for elementValue seconds it is possible to do the following:

val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
                                 //id.seconds
        fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

Upvotes: 1

Related Questions