Reputation: 6763
Having a flow description in akka-streams
val flow: Flow[Input, Output, Unit] = ???
, how do I modify it to get a new flow description that performs a specified side-affect on start, i.e. when flow is materialized?
Upvotes: 0
Views: 556
Reputation: 15472
Starting materialization of a stream processing graph will set it in motion piece by piece, concurrently. The only way to perform an action that is guaranteed to happen before the first element is passed somewhere within that graph is to perform that action before materializing the graph. In this sense the answer by sschaef is slightly incorrect: using mapMaterializedValue
runs the action pretty early, but not such that it is guaranteed to happend before the first element is processed.
If we are talking about a Flow
here which only takes in inputs on one side and produces outputs on the other—i.e. it does not contain internal cycles or data sources—then one thing you can do to perform an action before the first element arrives is to attach a processing step to its input that does that:
def effectSource[T](block: => Unit) = Source.fromIterator(() => {block; Iterator.empty})
val newFlow = Flow[Input].prepend(effectSource(/* do stuff */)).via(flow)
Note
The above is using upcoming 2.0 syntax, in Akka Streams 1.0 it would be Source(() => { block; Iterator.empty })
and the prepend operation would need to be done using the FlowGraph DSL (the graph can be found here).
Upvotes: 5
Reputation: 53348
You said it by yourself, use the force of the materialization:
val newFlow = flow.mapMaterializedValue(_ ⇒ println("materialized"))
Upvotes: 3