Vijay Patel
Vijay Patel

Reputation: 57

Akka persistence receiveRecover receives snapshots that are from other actor instances

I am experiencing unexpected behaviour when using Akka persistence. I am fairly new to Akka so apologies in advance if I have missed something obvious.

I have an actor called PCNProcessor. I create an actor instance for every PCN id I have. The problem I experience is that when I create the first actor instance, all works fine and I receive the Processed response. However, when I create further PCNProcessor instances using different PCN ids, I get the Already processed PCN response.

Essentially, for some reason the snapshot stored as part of the first PCN id processor is reapplied to the subsequent PCN id instances even though it does not relate to that PCN and the PCN id is different. To confirm this behaviour, I printed out a log in the receiveRecover, and every subsequent PCNProcessor instance receives snapshots that do not belong to it.

My question is:

  1. Should I be storing the snapshots in a specific way so that they are keyed against the PCN Id? And then should I be filtering away snapshots that are not related to the PCN in context?
  2. Or should the Akka framework be taking care of this behind the scenes, and I should not have to worry about storing snapshots against the PCN id.

Source code for the actor is below. I do use sharding.


package com.abc.pcn.core.actors

import java.util.UUID

import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import com.abc.common.AutoPassivation
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt}

object PCNProcessor {

  import akka.contrib.pattern.ShardRegion
  import com.abc.pcn.core.PCN

  val shardName = "pcn"
  val idExtractor: ShardRegion.IdExtractor = {
    case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn))
  }
  val shardResolver: ShardRegion.ShardResolver = {
    case ProcessPCN(pcn) => pcn.id.toString
  }

  // shard settings
  def props = Props(classOf[PCNProcessor])

  // command and response
  case class ProcessPCN(pcn: PCN)

  case class NotProcessed(reason: String)

  case object Processed

}

class PCNProcessor
  extends PersistentActor
  with AtLeastOnceDelivery
  with AutoPassivation
  with ActorLogging {

  import com.abc.pcn.core.actors.PCNProcessor._

  import scala.concurrent.duration._

  context.setReceiveTimeout(10.seconds)

  private val pcnId = UUID.fromString(self.path.name)
  private var state: String = "not started"

  override def persistenceId: String = "pcn-processor-${pcnId.toString}"

  override def receiveRecover: Receive = {
    case SnapshotOffer(_, s: String) =>
      log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s)
      state = s
  }

  def receiveCommand: Receive = withPassivation {

    case ProcessPCN(pcn)
      if state == "processed" =>
      sender ! Left(NotProcessed("Already processed PCN"))

    case ProcessPCN(pcn)
      if pcn.name.isEmpty =>
      val error: String = "Name is invalid"
      persist(PCNNotProcessedEvt(pcn.id, error)) { evt =>
        state = "invalid"
        saveSnapshot(state)
        sender ! Left(NotProcessed(error))
      }

    case ProcessPCN(pcn) =>
      persist(PCNProcessedEvt(pcn.id)) { evt =>
        state = "processed"
        saveSnapshot(state)
        sender ! Right(Processed)
      }
  }
}

Update:

After logging out the metadata for the received snapshot, I can see the problem is that the snapshotterId is not resolving properly and is always being set to pcn-processor-${pcnId.toString} without resolving the bit in italics.

[INFO] [06/06/2015 09:10:00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5] Recovering. PCN ID: 16b3d4dd-9e0b-45de-8e32-de799d21e7c5, Metadata of snapshot SnapshotMetadata(pcn-processor-${pcnId.toString},1,1433577553585)

Upvotes: 0

Views: 565

Answers (2)

Edoardo Vencia
Edoardo Vencia

Reputation: 58

I think you are misusing the Scala string interpolation feature.
Try in the following way:

override def persistenceId: String = s"pcn-processor-${pcnId.toString}"

Please note the use of s before the string literal.

Upvotes: 3

Vijay Patel
Vijay Patel

Reputation: 57

Ok fixed this by changing the persistence id to the following line:

override def persistenceId: String = "pcn-processor-" + pcnId.toString

The original in string version:

override def persistenceId: String = "pcn-processor-${pcnId.toString}"

only works for persisting to journal but not for snapshots.

Upvotes: 0

Related Questions