Reputation: 10355
Flows can be connected with via
like:
def aToB: Flow[A, B, NotUsed] = { ??? }
def bToC: Flow[B, C, NotUsed] = { ??? }
def aToC: Flow[A, C, NotUsed] = { aToB.via(bToC) }
I would like to do the equivalent of flatMap
:
def aToSomeB: Flow[A, Some[B], NotUsed] = { ??? }
def aToSomeC: Flow[A, Some[C], NotUsed] = { aToSomeB.flatVia(bToC) }
Is there some built-in way to do flatVia
? It seems like a common need for things like Option
unwrapping and error flattening.
Upvotes: 2
Views: 1888
Reputation: 9023
It really depends if you are interested in keeping those None
s around, or if you want to throw them away.
As you typed your flow as Flow[A, Some[C], NotUsed]
is seems you are not interested in None
s at all. This means you can easily filter them out with collect
, e.g.
def aToSomeC: Flow[A, C, NotUsed] = { aToSomeB.collect{case Some(x) ⇒ x}.via(bToC) }
If, otherwise, you need to track the None
s (or the Left
s if you're dealing with Either
s), you'll need to write your "lifting" stage yourself. This can be written fairly generically. For example, it can be written as a function that takes any flow Flow[I, O, M]
and returns another flow Flow[Either[E, I], Either[E, O], M
]. Because it requires fan-out and fan-in stages, usage of GraphDSL is required.
def liftEither[I, O, E, M](f: Flow[I, O, M]): Graph[FlowShape[Either[E, I], Either[E, O]], M] =
Flow.fromGraph(GraphDSL.create(f) { implicit builder: GraphDSL.Builder[M] => f =>
val fIn = builder.add(Flow[Either[E, I]])
val p = builder.add(Partition[Either[E, I]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
val merge = builder.add(Merge[Either[E, O]](2))
val toRight = builder.add(Flow[O].map(Right(_)))
p.out(0).collect{case Left(x) ⇒ Left(x)} ~> merge
fIn.out ~> p.in
p.out(1).collect{case(Right(x)) ⇒ x} ~> f ~> toRight ~> merge
new FlowShape(fIn.in, merge.out)
})
This can be used as per below
def aToSomeB: Flow[A, Either[Throwable, B], NotUsed] = ???
def aToSomeC: Flow[A, Either[Throwable, C], NotUsed] = aToSomeB.via(liftEither(bToC))
Note that Option
s can easily be converted to Either
s to leverage the same helper function.
Upvotes: 3