Reputation: 11596
i need to write a GraphStage but ran into some problems. i've condensed the code to below and hope you guys can shed some light on it for me.
the sample code below is not my real use case, it is just here to demonstrate my point. hopefully it is something i don't understand about akka streams and not it's limitations.
the sample code builds a Graph with a WrapFlowShape and basically redirects the "in" of the graph to the in of an attach flow and the "out" of the graph to the out of the flow.
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable
import scala.io.StdIn
object WrapFlowSandbox extends App {
case class WrapFlowShape[I, O](
in: Inlet[I],
out: Outlet[O],
flowIn: Inlet[O],
flowOut: Outlet[I]) extends Shape {
val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil
val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil
def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy)
}
class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] {
val in: Inlet[I] = Inlet[I]("WrapFlow.in")
val out: Outlet[O] = Outlet[O]("WrapFlow.out")
val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn")
val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut")
val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut)
def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var inElem: I = _
setHandler(in, new InHandler {
def onPush = {
println("2 in.onPush")
inElem = grab(in)
pull(flowIn)
}
})
setHandler(out, new OutHandler {
def onPull = {
println("1 out.onPull")
pull(in)
}
})
setHandler(flowIn, new InHandler {
def onPush = {
println("4 flowIn.onPush")
val outElem = grab(flowIn)
push(out, outElem)
}
})
setHandler(flowOut, new OutHandler {
def onPull = {
println("3 flowOut.onPull")
push(flowOut, inElem)
}
})
}
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val flow = Flow[Int].map(_ + 1)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val select = b.add(new WrapFlow[Int, Int])
Source.single(1) ~> select.in
select.out ~> Sink.foreach[Int](r => println(s"result = $r"))
select.flowOut ~> flow ~> select.flowIn
ClosedShape
}).run(materializer)
StdIn.readLine
system.terminate
}
the output i expected to see is:
1 out.onPull
2 in.onPush
3 flowOut.onPull
4 flowIn.onPush
result = 2
but the actual output is just the first 3 lines:
1 out.onPull
2 in.onPush
3 flowOut.onPull
InHandler.onPush() for "flowIn" is never called.
i know it is unconventional to write a GraphStage this way, but i do have a need for it.
what puzzles me is that i generated a demand for the attached flow by pulling on it in step 2 (pull(flowIn)), and the attached flow in turn generated a demand for "flowOut" in step 3.
but after pushing an element through flowOut in step 3, the element was never pushed so step 4 was never executed.
why is that?
if the attached flow senses a demand downstream and generates a demand upstream at step 3, why doesn't the element pushed at step 3 get through to the attached stream?
Upvotes: 1
Views: 864
Reputation: 22439
Not sure I follow the logic in your Handlers. I revised them to the following based on what I understand from your GraphDSL.create()
content:
def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var inElem: I = _
setHandler(in, new InHandler {
def onPush = {
println("in.onPush")
inElem = grab(in)
push(flowOut, inElem)
}
})
setHandler(out, new OutHandler {
def onPull = {
println("out.onPull")
pull(flowIn)
}
})
setHandler(flowIn, new InHandler {
def onPush = {
println("flowIn.onPush")
val outElem = grab(flowIn)
push(out, outElem)
}
})
setHandler(flowOut, new OutHandler {
def onPull = {
println("flowOut.onPull")
pull(in)
}
})
}
Executing it should produce the following output:
out.onPull
flowOut.onPull
in.onPush
flowIn.onPush
result = 2
Noticed that method copyFromPorts()
wasn't overridden in your WrapFlowShape
case class (which isn't an abstract class). I believe you'll need to override it with something like the following:
override def copyFromPorts(
inlets: immutable.Seq[Inlet[_]],
outlets: immutable.Seq[Outlet[_]]) = {
WrapFlowShape[I, O](
inlets(0).as[I],
outlets(0).as[O],
inlets(1).as[O],
outlets(1).as[I])
}
Upvotes: 1