As As
As As

Reputation: 2107

Can actors read messages under a certain condition?

I have this situation:

Now i wonder if i can do the following things:

I don't know if I explained it well.

EDIT: Thank you, the answers are great, but I still have some doubts.

Upvotes: 3

Views: 237

Answers (5)

Sascha Kolberg
Sascha Kolberg

Reputation: 7162

You already have a lot of good answers, but somehow I feel compelled to offer something more brief, as what you need is not necessarily a state machine:

class ActorB extends Actor {
  def receive: Receive = caching(Nil)

  def caching(cached: List[String]): Receive = {
    case msg: String => 
      context.become(caching(msg :: cached))
    case Start => 
      cached.reverse.foreach(println)
      context.become(caching(Nil))
  }
}

Upvotes: 0

Till Rohrmann
Till Rohrmann

Reputation: 13346

You can exactly solve your problem with the Stash trait and the become/unbecome functionality of Akka. The idea is the following:

When you receive a Stop message then you switch to a behaviour where you stash all messages which are not Start. When you receive a Start message, then you switch to a behaviour where you print all received messages and additionally you unstash all messages which have arrived in the meantime.

case object Start
case object Stop
case object TriggerStateChange
case object SendMessage

class ActorB extends Actor with Stash {
  override def receive: Receive = {
    case Start =>
      context.become(printingBehavior, false)
      unstashAll()
    case x => stash()
  }

  def printingBehavior: Receive = {
    case msg: String => println(msg)
    case Stop => context.unbecome()
  }
}

class ActorA(val actorB: ActorRef) extends Actor {

  var counter = 0
  var started = false

  override def preStart: Unit = {
    import context.dispatcher

    this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange)
    this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage)
  }

  override def receive: Actor.Receive = {
    case SendMessage =>
      actorB ! "Message: " + counter
      counter += 1
    case TriggerStateChange =>
      actorB ! (if (started) {
        started = false
        Stop
      } else {
        started = true
        Start
      })
  }
}

object Akka {
  def main(args: Array[String]) = {
    val system = ActorSystem.create("TestActorSystem")

    val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB")
    val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA")

    system.awaitTermination()
  }
}

Upvotes: 2

Binzi Cao
Binzi Cao

Reputation: 1085

Some of my thoughts

  1. Yes if the boolean flag is got from some system resource like db or a config file, but I don't think it should be dependent on any external message, given that the actor receive messages from multiple other actors. If ActorB is only used by ActorA, the two can be merged to one

  2. Similar as 1, how to handle the messages from multiple actors? If there is only one actorA, the two actors can be merged. If there are multiple, the flag can be set in database, actorA change the flag in db to "Start" or "Stop". and Actor B will print or not based on the flag.

An actor should be doing something very independently on other actor's state. The start and stop is actually some state of ActorA instead of ActorB

Upvotes: 0

izikovic
izikovic

Reputation: 133

Check the Become/Unbecome functionality. It lets you change the behavior of the actor.

If I understood correctly you want your ActorB to have two different states. In the first state it should cache the messages it receives. In the second state, it should print all the cached messages and start printing all the new ones.

Something like this:

case class Start
case class Stop
case class Message(val: String)

class ActorB extends Actor {

  var cache = Queue()

  def initial: Receive = {
    case Message(msg) => cache += msg
    case Start => 
      for (m <- cache) self ! Message(m)
      context.become(printing)
  }

  def printing: Receive = {
    case Message(msg) => println(msg)
    case Stop => context.become(initial) //or stop the actor
  }

  def receive = initial
}

Upvotes: 2

Shadowlands
Shadowlands

Reputation: 15074

Have Actor B alternate between two states (two different behaviours). In the initial 'pending' state, it waits for a 'start' message, while stashing any other messages.

On receipt of a 'start' message, unstash all the stored messages and become 'active', waiting on a 'stop' message and writing out the other messages received (which will include the unstashed ones). On receiveing a 'stop' message, revert to the 'pending' behaviour.

Upvotes: 0

Related Questions