ksc0pe
ksc0pe

Reputation: 113

When building a FlowGraph, how can two Flows be joined?

I am trying to constuct a simple flow, with one source, one sink, and two 'Flow's between them. So something like

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
  val in = Source(1 to 10)
  val out = Sink.ignore
  val f1 = Flow[Int].map(_ + 1)
  val f2 = Flow[Int].map(_ + 2)
  builder.addEdge(builder.add(in), f1, builder.add(out))

  // builder.addEdge(builder.add(in), f1, f2, builder.add(out)) // does not compile

}.run

The commented line does not compile but demonstrates what I am trying to achieve.

The example is contrived in that it would be just as easy to define a new function that adds 3, or to compose the functions, however in reality the functions are much more complicated and are separated for simplicity.

I am not looking to do fan-out or fan-in here, just a straight flow where I can have an arbitrary number of functions between the them.

Thanks.

Upvotes: 2

Views: 275

Answers (2)

insdami
insdami

Reputation: 11

The problem here is you need a FlowShape.

You got 2 way to use addEdge:

def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit

and

def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit

To do what you want to do using the builder you can create 2 FlowShape and use the from: Outlet[T], to: Inlet[T] to connect them.

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
    val in = Source(1 to 10)
    val out = Sink.foreach(println)

//    val f1: Flow[Int, Int, Unit] = Flow[Int].map(_ + 1)
//    val f2: Flow[Int, Int, Unit] = Flow[Int].map(_ + 2)

    val f1: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
    val f2: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 2))
    builder.addEdge(builder.add(in), f1.inlet) //Source to f1 in
    builder.addEdge(f1.outlet, f2.inlet) // f1 out to f2 in
    builder.addEdge(f2.outlet, builder.add(out)) // f2 out to sink
  }.run()

I left the types so you can see the difference.

A second option to do it is creating the FlowShape using a partial graph.

val partialFlow: Graph[FlowShape[Int, Int], Unit] = FlowGraph.partial() { builder =>
    val f1 = builder.add(Flow[Int].map(_ + 1))
    val f2 = builder.add(Flow[Int].map(_ + 2))
    builder.addEdge(f1.outlet, f2.inlet)

    FlowShape(f1.inlet, f2.outlet)
  }

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
    val in = Source(1 to 10)
    val out = Sink.foreach(println)
    builder.addEdge(builder.add(in), partialFlow, builder.add(out))
  }.run()

Upvotes: 1

Rex Kerr
Rex Kerr

Reputation: 167901

The via method on Flow should do what you want (i.e. f1 via f2).

See the scaladocs.

Note that you can also

val f = Flow[Int].
  map(_ + 1).
  map(_ + 2)

if you want to keep your separation. Or if you extract the functions as g1 and g2, you can also

val g1 = (i: Int) => i + 1
val g2 = (i: Int) => i + 2
val f = Flow[Int].map(g1 andThen g2)

In general, I would recommend working with functions as much as possible and saving flows for when you really need them.

Upvotes: 2

Related Questions