Reputation: 3116
I'm playing with Akka Streams and curious about constructing a Flow
through a function with parameters. Parameters should be passed from the previous one stream stage.
Let's assume that we have following inputs and outputs:
case class InitElement(v: Int)
trait StreamResult
case class StageA(v: Int) extends StreamResult
case class StageB(v: Int) extends StreamResult
trait StreamFailure extends StreamResult { val msg: String }
case class StageAFailure(msg: String) extends StreamFailure
case class StageBFailure(msg: String) extends StreamFailure
Stream stages for them look like:
val stageA = Flow[InitElement].map {
init =>
if (init.v > 10)
StageA(init.v)
else
StageAFailure(s"${init.v} less than 10")
}
def stageB(input: StreamResult)(externalService: Set[Int]) = Flow[StreamResult].map {
case failure: StreamFailure => failure
case StageA(v) =>
if (externalService.contains(v))
StageB(v)
else
StageBFailure(s"$v is absent in external service")
}
val graph = Source.single(InitElement(12))
.via(stageA)
//How can I pass output of previous stage into 'stageB' function?
.via(stageB(_)(Set(11, 15, 20)))
.toMat(Sink.head)(Keep.right)
I don't see any way to pass result of stageA
into the function which constructs the next stage.
How can I implement this?
Upvotes: 0
Views: 733
Reputation: 2081
Your definition of stageB
builds a Flow[StreamResult, StreamResult, _]
. It doesn't need to take (input: StreamResult)
as a parameter. Note that you don't use input
anywhere in stageB
definition. The element comes from the Flow[StreamResult]
on which you will map
.
This should be enough:
def stageB(externalService: Set[Int]) = … // As you wrote it
val graph = Source.single(InitElement(12))
.via(stageA)
.via(stageB(Set(11, 15, 20)))
.toMat(Sink.head)(Keep.right)
Upvotes: 3