Reputation: 475
My stream has a Flow whose outputs are List[Any] objects. I want to have a mapAsync followed by some other stages each of which processed an individual element instead of the list. How can I do that?
Effectively I want to connect the output of
Flow[Any].map { msg =>
someListDerivedFrom(msg)
}
to be consumed by -
Flow[Any].mapAsyncUnordered(4) { listElement =>
actorRef ? listElement
}.someOtherStuff
How do I do this?
Upvotes: 7
Views: 5035
Reputation: 35463
I think the combinator you are looking for is mapConcat
. This combinator will take an input argument and return something that is an Iterable
. A simple example would be as follows:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)
val graph =
source.
mapConcat(identity).
to(sink)
graph.run
Here, my Source
is spitting out List
elements, and my Sink
accepts the underlying type of what's in those List
s. I can't connect them directly together as the types are different. But if I apply mapConcat
between them, they can be connected as that combinator will flatten those List
elements out, sending their individual elements (Int
) downstream. Because the input element to mapConcat
is already an Iterable
, then you only need to use the identify
function in the body of mapConcat
to make things work.
Upvotes: 13