cokeSchlumpf
cokeSchlumpf

Reputation: 173

Akka Streams: Calling push(_,_) not within onPull(_,_) is blocking the stream - Why?

I'm having trouble understanding the behaviour of a small sample customer Akka Streams Source.

The idea behind the sample is that the Source should ask an Actor for the next element. See the code below

class ActorSource[T](context: ActorRefFactory, actor: ActorRef) extends GraphStage[SourceShape[T]] {

  val out: Outlet[T] = Outlet("actor-source")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        val receivingActor = context.actorOf(Props(new ReceivingActor(msg => {
          push(out, msg)
          println("push - Done")
        })))

        override def onPull(): Unit = {
          actor ! Protocol.Pull(receivingActor)
          println("onPull - Done")
        }
      })
    }
  }

  override def shape: SourceShape[T] = SourceShape(out)

  /**
    * A small actor which receives new elements from the actual source actor.
    *
    * @param push The method to push elements into the stream
    */
  class ReceivingActor(push: T => Unit) extends Actor with ActorLogging with UnknownMessage {

    override def receive: Receive = {
      case Protocol.Push(msg) =>
        push(msg.asInstanceOf[T])  // I know that this is evil ....just for test in that case...
      case msg =>
        unknownMessage(msg)
    }

  }

}

object ActorSource {

  /**
    * Creates an [[ActorSource]]
    *
    * @param actor   The actor which acts as the source
    * @param context The context to create the internal helper actor
    * @return A new akka-streams source
    */
  def Source[T](actor: ActorRef)(implicit context: ActorRefFactory): AkkaSource[T, NotUsed] = {
    val graph: Graph[SourceShape[T], NotUsed] = new ActorSource[T](context, actor)
    AkkaSource.fromGraph(graph)
  }

  /**
    * Defines the messages/ events for the source actor
    */
  object Protocol {

    /**
      * Will be sent if the stream requires new elements.
      *
      * @param actor The actor which should receive the push message
      */
    case class Pull(actor: ActorRef)

    /**
      * Sent by the source actor to submit a new element.
      *
      * @param msg The message to put into the stream.
      */
    case class Push(msg: Any)

  }

}

If you create a stream with that source like this one:

class SampleActor extends Actor with ActorLogging with UnknownMessage {

  var counter = 0

  override def receive: Receive = {
    case msg @ Protocol.Pull(actor) =>
      actor ! Protocol.Push(counter)
      counter = counter + 1
  }

}

val sourceActor = system.actorOf(Props(new SampleActor()))

val stream = ActorSource
  .Source[Int](sourceActor)(system)
  .take(10)
  .runWith(Sink.foreach(println))

Await.result(stream, 30 seconds)

The output is only as follows:

onPull - Done
push - Done

The first integer never reaches the Sink and onPull is not called anymore. What is interesting, if I kill the programm the first integer is printed in the Sink.

I'm wondering if this is a feature or a bug? From what I understood one can call push(_, _) anytime after one pull signalled that the outlet is open, even if I ask for isAvailable it returns true.

Can anyone explain that behaviour?

Upvotes: 1

Views: 439

Answers (1)

cokeSchlumpf
cokeSchlumpf

Reputation: 173

After reading more in the docs (RTFM ;)) I think I found the answer to my question.

It is mentioned in multiple places that it‘s not safe to call these API methods outside the related callbacks. To achieve what I was going to achieve Akka Streams provides a getAsyncCallback method.

More details can be found here: https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-customize.html#using-asynchronous-side-channels.

Upvotes: 0

Related Questions