Greg
Greg

Reputation: 11542

Is there a higher-level way to write a custom GraphStage?

I'm trying to understand some of the newer bits of Akka Streams. I have a custom FanOutShape2 shape that does something very simple: take an input of (Boolean, Option[A_Thing]) and decide wether to route flow to either out0 or out1 (pass or fail) like this:

object PassFilter {
  type FilterShape = FanOutShape2[(Boolean, Option[OutputWrapper]), OutputWrapper, akka.NotUsed]
}
import PassFilter._

case class PassFilter()(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
  val mergedIn: Inlet[(Boolean, Option[OutputWrapper])] = Inlet("Merged")
  val outPass: Outlet[OutputWrapper] = Outlet("Pass")
  val outFail: Outlet[akka.NotUsed] = Outlet("Fail")

  override val shape: FilterShape = new FanOutShape2(mergedIn, outPass, outFail)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    override def preStart(): Unit = pull(mergedIn)
    setHandler(mergedIn, new InHandler {
      override def onPush(): Unit = {
        val (passedPrivacy, outWrapper) = grab(mergedIn)
        if (!passedPrivacy || outWrapper.isEmpty)
          push(outFail, akka.NotUsed)
        else
          push(outPass, outWrapper.get)
        pull(mergedIn)
      }
      override def onUpstreamFinish(): Unit = {} // necessary for some reason!
    })
    setHandler(outPass, eagerTerminateOutput)
    setHandler(outFail, eagerTerminateOutput)
  }
}

This basic idea works, and I can see it gives me the possibility of total control of the process, but for this super-trivial decision logic is there a simpler, higher-level way to create a "widget" that can be included in my GraphDSL?

Upvotes: 3

Views: 720

Answers (1)

kiritsuku
kiritsuku

Reputation: 53358

This answer is based on akka-stream version 2.4.2-RC1. The API can be slightly different in other versions. The dependency can be consumed by sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC1"

You can easily create your own abstraction of that logic. Instead of hardcoding the type of your PassFilter class, you could add it as a type argument. And instead of hardcoding the function that determines the output port, you could pass it as a constructor argument. By doing this, you will receive a reusable component that you could connect to arbitrary streams. Luckily, Akka already provides such a component. It is called Partition:

val shape = GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._

  val first = b.add(Sink.foreach[Int](elem ⇒ println("even:\t" + elem)))
  val second = b.add(Sink.foreach[Int](elem ⇒ println("odd:\t" + elem)))
  val p = b.add(Partition[Int](2, elem ⇒ if (elem%2 == 0) 0 else 1))

  p ~> first
  p ~> second

  SinkShape(p.in)
}
Source(1 to 5).to(shape).run()

/*
This should print:
odd:    1
even:   2
odd:    3
even:   4
odd:    5
*/

The Partition component also takes a number of output ports as an argument to make it even more reusable. With the ~> symbol you can connect the output ports to other components, which I did in the example. You can of course leave that out and return both output ports through the FanOutShape2 component.

Upvotes: 4

Related Questions