Alex Fruzenshtein
Alex Fruzenshtein

Reputation: 3116

How to pass parameters in Akka Stream Flow from a previous stage?

I'm playing with Akka Streams and curious about constructing a Flow through a function with parameters. Parameters should be passed from the previous one stream stage.

Let's assume that we have following inputs and outputs:

case class InitElement(v: Int)

trait StreamResult
case class StageA(v: Int) extends StreamResult
case class StageB(v: Int) extends StreamResult

trait StreamFailure extends StreamResult { val msg: String }
case class StageAFailure(msg: String) extends StreamFailure
case class StageBFailure(msg: String) extends StreamFailure

Stream stages for them look like:

val stageA = Flow[InitElement].map {
    init => 
      if (init.v > 10) 
        StageA(init.v) 
      else 
        StageAFailure(s"${init.v} less than 10")
  }

  def stageB(input: StreamResult)(externalService: Set[Int]) = Flow[StreamResult].map {
    case failure: StreamFailure => failure
    case StageA(v) => 
      if (externalService.contains(v)) 
        StageB(v)
      else
        StageBFailure(s"$v is absent in external service")
  }

  val graph = Source.single(InitElement(12))
    .via(stageA)
//How can I pass output of previous stage into 'stageB' function?
    .via(stageB(_)(Set(11, 15, 20)))
    .toMat(Sink.head)(Keep.right)

I don't see any way to pass result of stageA into the function which constructs the next stage.

How can I implement this?

Upvotes: 0

Views: 733

Answers (1)

Samuel Tardieu
Samuel Tardieu

Reputation: 2081

Your definition of stageB builds a Flow[StreamResult, StreamResult, _]. It doesn't need to take (input: StreamResult) as a parameter. Note that you don't use input anywhere in stageB definition. The element comes from the Flow[StreamResult] on which you will map.

This should be enough:

def stageB(externalService: Set[Int]) = … // As you wrote it

val graph = Source.single(InitElement(12))
  .via(stageA)
  .via(stageB(Set(11, 15, 20)))
  .toMat(Sink.head)(Keep.right)

Upvotes: 3

Related Questions