Walter Chang
Walter Chang

Reputation: 11596

Problems with Akka Streams' GraphStage

i need to write a GraphStage but ran into some problems. i've condensed the code to below and hope you guys can shed some light on it for me.

the sample code below is not my real use case, it is just here to demonstrate my point. hopefully it is something i don't understand about akka streams and not it's limitations.

the sample code builds a Graph with a WrapFlowShape and basically redirects the "in" of the graph to the in of an attach flow and the "out" of the graph to the out of the flow.

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.collection.immutable
import scala.io.StdIn

object WrapFlowSandbox extends App {
  case class WrapFlowShape[I, O](
      in: Inlet[I],
      out: Outlet[O],
      flowIn: Inlet[O],
      flowOut: Outlet[I]) extends Shape {
    val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil
    val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil
    def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy)
  }
  class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] {
    val in: Inlet[I] = Inlet[I]("WrapFlow.in")
    val out: Outlet[O] = Outlet[O]("WrapFlow.out")
    val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn")
    val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut")
    val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut)
    def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      var inElem: I = _
      setHandler(in, new InHandler {
        def onPush = {
          println("2 in.onPush")
          inElem = grab(in)
          pull(flowIn)
        }
      })
      setHandler(out, new OutHandler {
        def onPull = {
          println("1 out.onPull")
          pull(in)
        }
      })
      setHandler(flowIn, new InHandler {
        def onPush = {
          println("4 flowIn.onPush")
          val outElem = grab(flowIn)
          push(out, outElem)
        }
      })
      setHandler(flowOut, new OutHandler {
        def onPull = {
          println("3 flowOut.onPull")
          push(flowOut, inElem)
        }
      })
    }
  }
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  val flow = Flow[Int].map(_ + 1)
  RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val select = b.add(new WrapFlow[Int, Int])
    Source.single(1) ~> select.in
    select.out ~> Sink.foreach[Int](r => println(s"result = $r"))
    select.flowOut ~> flow ~> select.flowIn
    ClosedShape
  }).run(materializer)
  StdIn.readLine
  system.terminate
}

the output i expected to see is:

1 out.onPull
2 in.onPush
3 flowOut.onPull
4 flowIn.onPush
result = 2

but the actual output is just the first 3 lines:

1 out.onPull
2 in.onPush
3 flowOut.onPull

InHandler.onPush() for "flowIn" is never called.

i know it is unconventional to write a GraphStage this way, but i do have a need for it.

what puzzles me is that i generated a demand for the attached flow by pulling on it in step 2 (pull(flowIn)), and the attached flow in turn generated a demand for "flowOut" in step 3.

but after pushing an element through flowOut in step 3, the element was never pushed so step 4 was never executed.

why is that?

if the attached flow senses a demand downstream and generates a demand upstream at step 3, why doesn't the element pushed at step 3 get through to the attached stream?

Upvotes: 1

Views: 864

Answers (1)

Leo C
Leo C

Reputation: 22439

Not sure I follow the logic in your Handlers. I revised them to the following based on what I understand from your GraphDSL.create() content:

def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
  var inElem: I = _
  setHandler(in, new InHandler {
    def onPush = {
      println("in.onPush")
      inElem = grab(in)
      push(flowOut, inElem)
    }
  })
  setHandler(out, new OutHandler {
    def onPull = {
      println("out.onPull")
      pull(flowIn)
    }
  })
  setHandler(flowIn, new InHandler {
    def onPush = {
      println("flowIn.onPush")
      val outElem = grab(flowIn)
      push(out, outElem)
    }
  })
  setHandler(flowOut, new OutHandler {
    def onPull = {
      println("flowOut.onPull")
      pull(in)
    }
  })
}

Executing it should produce the following output:

out.onPull
flowOut.onPull
in.onPush
flowIn.onPush
result = 2

Noticed that method copyFromPorts() wasn't overridden in your WrapFlowShape case class (which isn't an abstract class). I believe you'll need to override it with something like the following:

override def copyFromPorts(
    inlets: immutable.Seq[Inlet[_]],
    outlets: immutable.Seq[Outlet[_]]) = {
  WrapFlowShape[I, O](
    inlets(0).as[I],
    outlets(0).as[O],
    inlets(1).as[O],
    outlets(1).as[I])
}

Upvotes: 1

Related Questions