Oli
Oli

Reputation: 1142

Akka Streams cycle example in docs not working

I'm trying to build an Akka Stream with a simple cycle in it. After reading the documentation here and having no luck with it I tried to just copy the example code as a starting base, but that also doesn't work. The code compiles (after including a source which is missing from the example) but nothing is printed out. It looks as though something is backpressuring for ever but I don't understand why.

Here's my code, any help would be much appreciated:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl._
import akka.stream.ClosedShape

object Simulate {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  def main(args: Array[String]): Unit = {

    // Define simulation flowgraph
    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
      import b._
      import GraphDSL.Implicits._

      val source  = add(Source.repeat[Int](1))
      val zip     = add(ZipWith[Int, Int, Int]((left, right) => left))
      val bcast   = add(Broadcast[Int](2))
      val concat  = add(Concat[Int]())
      val start   = add(Source.single[Int](0))
      val sink    = add(Sink.ignore)

      source ~> zip.in0
                zip.out.map { s => println(s); s } ~> bcast ~> sink
                            concat        <~          bcast
                zip.in1 <~  concat        <~          start
      ClosedShape
    })

    g.run()

  }
}

Upvotes: 0

Views: 449

Answers (1)

manub
manub

Reputation: 4100

EDIT: it actually seems that the problem is not adding a buffer, but the order inlets / outlets were declared.

This works:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val source = Source.repeat(1)
  val start = Source.single(0)

  val zip = b.add(ZipWith((left: Int, right: Int) => left))
  val bcast = b.add(Broadcast[Int](2))
  val concat = b.add(Concat[Int]())

  source ~> zip.in0
            zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
            zip.in1 <~ concat <~ start
                       concat <~ bcast
  ClosedShape
})

g.run()

The order of zip.in1 <~ concat <~ start and concat <~ bcast is consistent with what's on the docs.

Upvotes: 1

Related Questions