SeDav
SeDav

Reputation: 781

How to mapConcat() and then fold() in a nested Flow before the upstream completes

Basically I'm trying to achieve the following scenario, that I consider a very simple use case. But as I'm new to akka-streams, I don't get it right.

At a certain stage of my stream graph, I split up N elements using the mapConcat function, then processing each of them in a nested flow in parallel and afterwards folding them again. The amount N of sub elements emitted by mapConcat is not know in advance and can differ from zero to hundreds of elements. But the fold function as stated in the docs only completes if the upstream completes, but I need a fan-in stage that does it, once all elements, that have been splitted by the mapConcat stage, have been processed.

nested Flow with mapConcat and fold

A minimal example would be something like that:

Source(1 to 10)
  .via(
     Flow[Int].map(el => el * el))
  .via(
     Flow[Int]
       .mapConcat(el => Set(el + 1, el + 2, el + 3))
       .map(el => el * el)
       .fold(0)((all, cur) => all + cur)
     )
  .runForeach(println)

For an interactive example see: https://scastie.scala-lang.org/hPdKEF4QS0yZazWMfFvuEQ

The fold operation folds all values, so it prints one value. How to construct the flow in order to get 10 results?

Upvotes: 1

Views: 1536

Answers (2)

Leo C
Leo C

Reputation: 22449

mapConcat applies a function to every element in an input stream and concatenate in the output stream. While having a slightly simpler signature, it operates like flatMapConcat and can probably be best illustrated with the following diagram from this Akka Stream doc.

enter image description here

With the above picture in mind, to review the flow content per Source element, you can use grouped to group the mapConcat-ed data, as shown below:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

Source(1 to 10).
  via(
    Flow[Int].map(el => el * el)).
  via(
    Flow[Int].
      mapConcat(el => Set(el + 1, el + 2, el + 3)).
      map(el => el * el).
      grouped(3).
      map{ g =>
        val sum = g.reduce(_ + _)
        println(s"$g: sum = $sum")
        sum
      }.
      fold(0)(_ + _)
    ).
  runForeach(println)

// res1: scala.concurrent.Future[akka.Done] = Future(<not completed>)
// Vector(4, 9, 16): sum = 29
// Vector(25, 36, 49): sum = 110
// Vector(100, 121, 144): sum = 365
// Vector(289, 324, 361): sum = 974
// Vector(676, 729, 784): sum = 2189
// Vector(1369, 1444, 1521): sum = 4334
// Vector(2500, 2601, 2704): sum = 7805
// Vector(4225, 4356, 4489): sum = 13070
// Vector(6724, 6889, 7056): sum = 20669
// Vector(10201, 10404, 10609): sum = 31214
// 80759

[UPDATE]

If the Iterable generated by mapConcat varies in size per Source element, grouped will no longer be helpful. In some cases, you might be able to move any post-mapConcat transformations into the body of the function taken by mapConcat, like in the following example:

import java.util.concurrent.ThreadLocalRandom

Source(1 to 10).
  via(
    Flow[Int].map(el => el * el)).
  via(
    Flow[Int].
      mapConcat{ el =>
        val list = for (i <- 1 to ThreadLocalRandom.current.nextInt(1, 4))
          yield el + i
        val list2 = list.map(e => e * e)
        val sum = list2.reduce(_ + _)
        println(s"$list2: sum = $sum")
        list2
      }.
      fold(0)(_ + _)
    ).
  runForeach(println)

// res2: scala.concurrent.Future[akka.Done] = Future(<not completed>)
// Vector(4): sum = 4
// Vector(25, 36, 49): sum = 110
// Vector(100, 121, 144): sum = 365
// Vector(289): sum = 289
// Vector(676, 729): sum = 1405
// Vector(1369): sum = 1369
// Vector(2500, 2601, 2704): sum = 7805
// Vector(4225, 4356): sum = 8581
// Vector(6724, 6889, 7056): sum = 20669
// Vector(10201, 10404): sum = 20605
// 61202

Upvotes: 1

rad i
rad i

Reputation: 294

you can get your 10 elements via grouping by N (N : number of results emitted by mapConcat):

Source(1 to 10)
.via(
  Flow[Int].map(el => el * el))
.via(
  Flow[Int]
    .mapConcat(el => Set(el + 1, el + 2, el + 3))
    .map(el => el * el)
    .grouped(3)
    .map(s => s.sum))
.runForeach(println)

But mapConcat does not create a substream, it just takes a single element and emits N. All on the same stream.

If on the other hand you really wanted to create substreams I'd suggest having a look at akka-doc#substream

Upvotes: 1

Related Questions