Reputation: 7996
My akka stream is stopping after a single element. Here's my stream:
val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
FirehoseActor.props(
auth = ...
)
)
val ref = Flow[FirehoseActor.RawTweet]
.map(r => ResponseParser.parseTweet(r.payload))
.map { t => println("Received: " + t); t }
.to(Sink.onComplete({
case Success(_) => logger.info("Stream completed")
case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}")
}))
.runWith(firehoseSource)
FirehoseActor
connects to the Twitter firehose and buffers messages to a queue. When the actor receives a Request
message, it take
s the next element and returns it:
def receive = {
case Request(_) =>
logger.info("Received request for next firehose element")
onNext(RawTweet(queue.take()))
}
The problem is that only a single tweet is being printed to the console. The program doesn't quit or throw any errors, and I've sprinkled logging statements around, and none are printed.
I thought the sink would keep applying pressure to pull elements through but that doesn't seem to be the case since neither of the messages in Sink.onComplete
get printed. I also tried using Sink.ignore
but that only printed a single element as well. The log message in the actor only gets printed once as well.
What sink do I need to use to make it pull elements through the flow indefinitely?
Upvotes: 0
Views: 417
Reputation: 7996
Ah I should have respected totalDemand
in my actor. This fixes the issue:
def receive = {
case Request(_) =>
logger.info("Received request for next firehose element")
while (totalDemand > 0) {
onNext(RawTweet(queue.take()))
}
I was expecting to receive a Request
for each element in the stream, but apparently each Flow will send a Request
.
Upvotes: 0