marcprux
marcprux

Reputation: 10355

Akka streams flattening Flows via

Flows can be connected with via like:

def aToB: Flow[A, B, NotUsed] = { ??? }  
def bToC: Flow[B, C, NotUsed] = { ??? }  
def aToC: Flow[A, C, NotUsed] = { aToB.via(bToC) }  

I would like to do the equivalent of flatMap:

def aToSomeB: Flow[A, Some[B], NotUsed] = { ??? }  
def aToSomeC: Flow[A, Some[C], NotUsed] = { aToSomeB.flatVia(bToC) }

Is there some built-in way to do flatVia? It seems like a common need for things like Option unwrapping and error flattening.

Upvotes: 2

Views: 1888

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

It really depends if you are interested in keeping those Nones around, or if you want to throw them away.

As you typed your flow as Flow[A, Some[C], NotUsed] is seems you are not interested in Nones at all. This means you can easily filter them out with collect, e.g.

def aToSomeC: Flow[A, C, NotUsed] = { aToSomeB.collect{case Some(x) ⇒ x}.via(bToC) }

If, otherwise, you need to track the Nones (or the Lefts if you're dealing with Eithers), you'll need to write your "lifting" stage yourself. This can be written fairly generically. For example, it can be written as a function that takes any flow Flow[I, O, M] and returns another flow Flow[Either[E, I], Either[E, O], M]. Because it requires fan-out and fan-in stages, usage of GraphDSL is required.

  def liftEither[I, O, E, M](f: Flow[I, O, M]): Graph[FlowShape[Either[E, I], Either[E, O]], M] =
    Flow.fromGraph(GraphDSL.create(f) { implicit builder: GraphDSL.Builder[M] => f =>

      val fIn      = builder.add(Flow[Either[E, I]])
      val p        = builder.add(Partition[Either[E, I]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
      val merge    = builder.add(Merge[Either[E, O]](2))
      val toRight  = builder.add(Flow[O].map(Right(_)))

                 p.out(0).collect{case Left(x) ⇒ Left(x)}             ~> merge
      fIn.out ~> p.in
                 p.out(1).collect{case(Right(x)) ⇒ x} ~> f ~> toRight ~> merge

      new FlowShape(fIn.in, merge.out)
    })

This can be used as per below

  def aToSomeB: Flow[A, Either[Throwable, B], NotUsed] = ???
  def aToSomeC: Flow[A, Either[Throwable, C], NotUsed] = aToSomeB.via(liftEither(bToC))

Note that Options can easily be converted to Eithers to leverage the same helper function.

Upvotes: 3

Related Questions