Snowy Coder Girl
Snowy Coder Girl

Reputation: 5518

Manipulate Seq Elements in an Akka Flow

I have 2 flows like the following:

val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...

I want to combine these into a convenience method like the following:

val aToSeqOfC: Flow[A, Seq[C], NotUsed]

So far I have the following, but I know it just ends up with C elements and not Seq[C].

Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)

How can I preserve the Seq in this scenario?

Upvotes: 2

Views: 295

Answers (1)

Indirect Answer

In my opinion your question highlights one of the "rookie mistakes" that is common when dealing with akka streams. It is usually not good organization to put business logic within akka stream constructs. Your question indicates that you have something of the form:

val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B => 
  //business logic
}

The more ideal scenario would be if you had:

//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
  //business logic
}

val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc

In the above "ideal" example the Flow is just a thin veneer on top of normal, non-akka, business logic.

The separate logic can then simply solve your original question with:

val aToSeqOfC : Flow[A, Seq[C], NotUsed] = 
  aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))

Direct Answer

If you cannot reorganize your code then the only available option is to deal with Futures. You'll need to use bToC within a separate sub-stream:

val mat : akka.stream.Materializer = ???

val seqBToSeqC : Seq[B] => Future[Seq[C]] = 
  (seqB) =>
    Source
      .apply(seqB.toIterable)
      .via(bToC)
      .to(Sink.seq[C])
      .run()

You can then use this function within a mapAsync to construct the Flow you are looking for:

val parallelism = 10

val aToSeqOfC: Flow[A, Seq[C], NotUsed] = 
  aToSeqB.mapAsync(parallelism)(seqBtoSeqC)

Upvotes: 4

Related Questions