laurids
laurids

Reputation: 941

Good Scala pattern to consume from a Queue until there are no more messages?

I have a queue of N messages dispatched by an Actor, I want to consume them all. The actor will return either a Message type or a NoMessages type if the queue is empty.

I came up with this but doesn't feel idiomatic, and I'm not sure about how many threads I'm spinning up every time I call consume()?

What is a better way of doing this?

def main(): Unit = {

  val queue = system.actorOf(...)

  def consume(): Unit = {
    ask(queue, Read) foreach {
      case Message(m) => {
        // handle message
        consume()
      }
      case NoMessages =>  {
        system.shutdown()
      }
    }
  }
  consume()
}

Upvotes: 0

Views: 400

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

If Message and NoMessages extend a common trait (let's call it Msg), you could use Akka Streams:

import akka.Done
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
import scala.concurrent._
import scala.concurrent.duration._

implicit val system = ActorSystem("QueueSys")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val queue = system.actorOf(...)

def handleMessage(msg: Message): Unit = ???

implicit val askTimeout = Timeout(5.seconds)

val stream: Future[Done] = Source.fromIterator(() => Iterator.continually(Read))
  .ask[Msg](parallelism = 3)(queue) // adjust the parallelism as needed
  .takeWhile(_.isInstanceOf[Message])
  .runForeach(handleMessage)

stream.onComplete(_ => system.terminate())

The above stream will continually send Read messages to the queue actor and process Message responses, until the actor replies with NoMessage.

Upvotes: 6

Related Questions