Asa
Asa

Reputation: 1466

How do I create a Flow with a different input and output types for use inside of a graph?

I am making a custom sink by building a graph on the inside. Here is a broad simplification of my code to demonstrate my question:

def mySink: Sink[Int, Unit] = Sink() { implicit builder =>

    val entrance = builder.add(Flow[Int].buffer(500, OverflowStrategy.backpressure))
    val toString = builder.add(Flow[Int, String, Unit].map(_.toString))
    val printSink = builder.add(Sink.foreach(elem => println(elem)))

    builder.addEdge(entrance.out, toString.in)
    builder.addEdge(toString.out, printSink.in)

    entrance.in
}

The problem I am having is that while it is valid to create a Flow with the same input/output types with only a single type argument and no value argument like: Flow[Int] (which is all over the documentation) it is not valid to only supply two type parameters and zero value parameters.

According to the reference documentation for the Flow object the apply method I am looking for is defined as

def apply[I, O]()(block: (Builder[Unit]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit]

and says

Creates a Flow by passing a FlowGraph.Builder to the given create function.

The create function is expected to return a pair of Inlet and Outlet which correspond to the created Flows input and output ports.

It seems like I need to deal with another level of graph builders when I am trying to make what I think is a very simple flow. Is there an easier and more concise way to create a Flow that changes the type of it's input and output that doesn't require messing with it's inside ports? If this is the right way to approach this problem, what would a solution look like?

BONUS: Why is it easy to make a Flow that doesn't change the type of its input from it's output?

Upvotes: 2

Views: 1585

Answers (1)

Jos Dirksen
Jos Dirksen

Reputation: 1903

If you want to specify both the input and the output type of a flow, you indeed need to use the apply method you found in the documentation. Using it, though, is done pretty much exactly the same as you already did.

Flow[String, Message]() { implicit b =>
  import FlowGraph.Implicits._

  val reverseString = b.add(Flow[String].map[String] { msg => msg.reverse })
  val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))

  // connect the graph
  reverseString ~> mapStringToMsg

  // expose ports
  (reverseString.inlet, mapStringToMsg.outlet)
}

Instead of just returning the inlet, you return a tuple, with the inlet and the outlet. This flow can now we used (for instance inside another builder, or directly with runWith) with a specific Source or Sink.

Upvotes: 3

Related Questions