Yaroslav
Yaroslav

Reputation: 4669

Akka persist on recovery completed updates state after first message

I have a persistent actor. When it starts for the first time (database is empty) I persist some initial data. But the state doesn't get updated as I would expect. It gets updated only after first message was handled. How can I get the actor to begin handle messages after state update?

Actor code

class TestActor extends PersistentActor {
  var numberOfEvents = 0

  def updateState(e: Any): Unit = {
    println("updating")
    numberOfEvents += 1
  }

  override def receiveRecover: Receive = {
    case RecoveryCompleted =>
      if (numberOfEvents == 0) {
        println("persisting")
        persist("foo")(updateState)
      }
  }

  override def receiveCommand: Receive = {
    case _ => {
      println("answering")
      sender ! numberOfEvents
    }
  }
}

Test code

Await.result(actorRef ? "stats", Duration.Inf) shouldBe 0 // I wan't 1 here
Await.result(actorRef ? "stats", Duration.Inf) shouldBe 1

Output

persisting
answering // why this goes before updating?
updating
answering

Full code

Upvotes: 2

Views: 1171

Answers (1)

Al Iacovella
Al Iacovella

Reputation: 456

One thing you want to reconsider is that you would typically not be updating the state on the RecoveryCompleted event but rather handle the events that you persisted to reconstitute the state. The RecoveryCompleted message is for handling what to do at the end of recovery. These events will be the events replayed from the journal you had persisted in. Optionally you will also get Snapshot events if you are using snapshotting.

For example:

override def receiveRecover: Receive = {
    case Added(num) =>
        updateState(num) 

    case SnapshotOffer(metadata, snapshot) ⇒
       // Restore your full state from the data in the snapshot

    case RecoveryCompleted =>
        println("Recovery completed") // use logger here
  }

The receiveCommand, on the other hand is used to handle incoming commands and persisting those events then updating the internal state after those events have been updated.

override def receiveCommand: Receive = {
    case Add(num) => {
      println("received event and persisting")

      persist(Added(num){ evt ⇒ 
        // This gets called after the persist succeeds
        updateState(num)
        sender ! numberOfEvents
      }      
    }
  }

def updateState(e: Int): Unit = {
    println("updating")
    numberOfEvents += e
  }

In terms of messaging I find it useful to name the event as the past tense of the command as you can see below:

// Events

case class Added(v:Int)

// Commands

case class Add(v:Int)

Hopefully this makes it a little clearer.

Upvotes: 1

Related Questions