Reputation: 941
I have a queue of N
messages dispatched by an Actor, I want to consume them all. The actor will return either a Message
type or a NoMessages
type if the queue is empty.
I came up with this but doesn't feel idiomatic, and I'm not sure about how many threads I'm spinning up every time I call consume()
?
What is a better way of doing this?
def main(): Unit = {
val queue = system.actorOf(...)
def consume(): Unit = {
ask(queue, Read) foreach {
case Message(m) => {
// handle message
consume()
}
case NoMessages => {
system.shutdown()
}
}
}
consume()
}
Upvotes: 0
Views: 400
Reputation: 19527
If Message
and NoMessages
extend a common trait (let's call it Msg
), you could use Akka Streams:
import akka.Done
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
import scala.concurrent._
import scala.concurrent.duration._
implicit val system = ActorSystem("QueueSys")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val queue = system.actorOf(...)
def handleMessage(msg: Message): Unit = ???
implicit val askTimeout = Timeout(5.seconds)
val stream: Future[Done] = Source.fromIterator(() => Iterator.continually(Read))
.ask[Msg](parallelism = 3)(queue) // adjust the parallelism as needed
.takeWhile(_.isInstanceOf[Message])
.runForeach(handleMessage)
stream.onComplete(_ => system.terminate())
The above stream will continually send Read
messages to the queue
actor and process Message
responses, until the actor replies with NoMessage
.
Upvotes: 6