Reputation: 93
I'm trying to catch 'Terminate Signal' from child to parent actor, however among pool of deadletter messages, the signal fails to arrive on parent actor's queue. What's the best way to resolve this?
Here's the snippet code I'm working on:
class MinerActor extends Actor {
var count:Int = 0
def receive = {
case Mine =>
//some task here
//if success
count = count + 1
//
if (count >= 100)
{
context.stop(self)
}
}
class MasterActor extends Actor {
val miner = context.actorOf(Props(new MinerActor,name = "miner")
context.watch(miner)
def receive = {
case Foo =>
while (true) {
miner ! Mine
}
case Terminated(miner) =>
println("Miner Terminated!!")
context.stop(self)
context.system.shutdown
}
}
Here the 'Terminated(miner)' case never gets called. Instead on stdout I see lots of dead-letter messages sent from Master to Miner (which is kind of expected as miner actor gets stopped). However how to prioritize the Terminate signal on the Event bus so as to reach Master Actor?
If I limit while loop to some 200 instead of infinity, after 100 deadletter messages, I receive Terminate Signal which prints "Miner Terminated!!". But how to achieve this when while loop being in infinity?
I'm new to Scala/Akka programming, my main aim here is to run '//some task' for 100 successful times and then exit the whole program. Is this a good way to achieve that task?
Upvotes: 1
Views: 211
Reputation: 26597
Replace:
case Foo =>
while (true) {
miner ! Mine
}
with
case Foo =>
miner ! Mine
self forward Foo
Upvotes: 4
Reputation: 13346
The problem is that you're infinite while loop is blocking the actor thread. As a consequence, your master actor is always stuck in processing the first arrived Foo
message and will never process any other messages in the mailbox. The reason for this is that there is only a single thread which is responsible for receiving the messages. This has some really nice implications because you basically don't have to worry about concurrency within a single actor if your state changes only happen within this thread.
There are multiple ways to solve this problem. I'd recommend using the scheduler to schedule a repeated task.
class MasterActor extends Actor {
var minerOption: Option[ActorRef] = None
var mineMessageOption: Option[Cancellable] = None
override def preStart: Unit = {
minerOption = Some(context.actorOf(Props(new MinerActor,name = "miner")))
minerOption.foreach(context.watch(_))
import context.dispatcher
mineMessageOption = Some(context.system.scheduler.schedule(0 seconds, 1 seconds, self, Foo))
}
def receive = {
case Foo =>
minerOption.foreach {
_ ! Mine
}
case Terminated(miner) =>
println("Miner Terminated!!")
mineMessageOption.foreach(_.cancel())
context.stop(self)
context.system.shutdown
}
}
In the schedule
call you can define the interval of the message Foo
and, thus, how many messages will be sent to the miner.
Upvotes: 2