Azeli
Azeli

Reputation: 748

Akka - how to maintain state through a context switch

I intend to create a custom router to pass messages to an unknown number of dynamically created actors(all of them). I therefore need to maintain a list of "subscribers" similar to the idea shown here. but I cannot seem to maintain the list through a context switch. I am having trouble understanding why the created mutable hashset gets reset after a context switch. Is there a way to circumvent ? the code:

class myRouter extends Actor {
 var subscribers: mutable.HashSet[ActorRef]()
 var waitingFor = subscribers

 def receive = receiveSubAndRequests

 def receiveSubAndRequests: Actor.Receive = {

  case RegisterWorker =>
   subscribers += sender() //this happens right after initiation
  case e: ComputationParams => //say these are guaranteed to come after all have subscribed
   subscribers.foreach(s=> s.tell(ComputeIt))//perform some compute. this works!
   context.become(receiveResults(e.foo, sender())//pass some state
 }  

 def receiveResults(num: Int, sendTo: ActorRef): Actor.Receive = {

  case result: Result =>
   waitingFor -= sender() //this works, meaning var was accessed
   //other stuff until all results are in

  case foo: OtherParams => //say we need to handle and initiate another ComputeIt
    // stuff
    subscribers.foreach(s => s.tell(ComputeIt)//!!subscribers is empty!!
    //other
  }

Upvotes: 1

Views: 417

Answers (1)

Shadowlands
Shadowlands

Reputation: 15074

Your waitingFor field is simply set up as a pointer to subscribers, so when the line waitingFor -= sender() is executed, the sender is effectively removed from the subscribers (being the same mutable HashSet as pointed to be waitingFor). I think what you are looking for is to initialise the waitingFor when you kick off the calculation:

case e: ComputationParams => //say these are guaranteed to come after all have subscribed
  subscribers.foreach(s=> s.tell(ComputeIt))//perform some compute. this works!
  waitingFor = new mutable.HashSet(subscribers) // ***
  context.become(receiveResults(e.foo, sender())//pass some state

At the line marked *** you (re-)create waitingFor to be a new HashSet initialised to hold all the same sender values as in the subscribers HashSet. Probably you need to do this also when you process case foo: OtherParams => in the "receiveResults" state.

Upvotes: 1

Related Questions