anindyaju99
anindyaju99

Reputation: 475

Akka stream - List to mapAsync of individual elements

My stream has a Flow whose outputs are List[Any] objects. I want to have a mapAsync followed by some other stages each of which processed an individual element instead of the list. How can I do that?

Effectively I want to connect the output of

Flow[Any].map { msg =>
  someListDerivedFrom(msg)
}

to be consumed by -

Flow[Any].mapAsyncUnordered(4) { listElement =>
  actorRef ? listElement
}.someOtherStuff

How do I do this?

Upvotes: 7

Views: 5035

Answers (1)

cmbaxter
cmbaxter

Reputation: 35463

I think the combinator you are looking for is mapConcat. This combinator will take an input argument and return something that is an Iterable. A simple example would be as follows:

implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()

val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)

val graph =
  source.
    mapConcat(identity).
    to(sink)
graph.run

Here, my Source is spitting out List elements, and my Sink accepts the underlying type of what's in those Lists. I can't connect them directly together as the types are different. But if I apply mapConcat between them, they can be connected as that combinator will flatten those List elements out, sending their individual elements (Int) downstream. Because the input element to mapConcat is already an Iterable, then you only need to use the identify function in the body of mapConcat to make things work.

Upvotes: 13

Related Questions