expert
expert

Reputation: 30085

How do I preserve materialized value in custom Sink?

Could you please tell me how to preserve materialized Future[Int] in example like this ?

val test: Sink[Int, NotUsed] =
  MergeHub.source[Int].grouped(100).to(Sink.fold(0L) { case (count, items) => count + items.sum }).run()

I'd like to get Sink[Int, Future[Int]]. I looked at viaMat, toMat and couldn't figure it out.

Upvotes: 1

Views: 177

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

If you want to run() the whole graph in the same place, just use toMat + Keep.right

val result: Future[Int] =
    MergeHub.source[Int].grouped(100).toMat(Sink.fold(0) { case (count, items) => count + items.sum })(Keep.right).run()

if you need to reuse your "group + fold" sink, you need to strip out the source and the run call

  val sink: Sink[Int, Future[Int]] =
    Flow[Int].grouped(100).toMat(Sink.fold(0) { case (count, items) => count + items.sum })(Keep.right)

More info in the docs.

Upvotes: 2

Related Questions