jbrown
jbrown

Reputation: 7996

Akka stream stops after one element

My akka stream is stopping after a single element. Here's my stream:

val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
  FirehoseActor.props(
    auth = ...
  )
)

val ref = Flow[FirehoseActor.RawTweet]
  .map(r => ResponseParser.parseTweet(r.payload))
  .map { t => println("Received: " + t); t }
  .to(Sink.onComplete({
    case Success(_) => logger.info("Stream completed")
    case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}")
  }))
  .runWith(firehoseSource)

FirehoseActor connects to the Twitter firehose and buffers messages to a queue. When the actor receives a Request message, it takes the next element and returns it:

def receive = {
  case Request(_) =>
    logger.info("Received request for next firehose element")
    onNext(RawTweet(queue.take()))
} 

The problem is that only a single tweet is being printed to the console. The program doesn't quit or throw any errors, and I've sprinkled logging statements around, and none are printed.

I thought the sink would keep applying pressure to pull elements through but that doesn't seem to be the case since neither of the messages in Sink.onComplete get printed. I also tried using Sink.ignore but that only printed a single element as well. The log message in the actor only gets printed once as well.

What sink do I need to use to make it pull elements through the flow indefinitely?

Upvotes: 0

Views: 417

Answers (1)

jbrown
jbrown

Reputation: 7996

Ah I should have respected totalDemand in my actor. This fixes the issue:

def receive = {
  case Request(_) =>
    logger.info("Received request for next firehose element")
    while (totalDemand > 0) {
      onNext(RawTweet(queue.take()))
    }

I was expecting to receive a Request for each element in the stream, but apparently each Flow will send a Request.

Upvotes: 0

Related Questions