Reputation: 57
I am experiencing unexpected behaviour when using Akka persistence. I am fairly new to Akka so apologies in advance if I have missed something obvious.
I have an actor called PCNProcessor. I create an actor instance for every PCN id I have. The problem I experience is that when I create the first actor instance, all works fine and I receive the Processed response. However, when I create further PCNProcessor instances using different PCN ids, I get the Already processed PCN response.
Essentially, for some reason the snapshot stored as part of the first PCN id processor is reapplied to the subsequent PCN id instances even though it does not relate to that PCN and the PCN id is different. To confirm this behaviour, I printed out a log in the receiveRecover, and every subsequent PCNProcessor instance receives snapshots that do not belong to it.
My question is:
Source code for the actor is below. I do use sharding.
package com.abc.pcn.core.actors
import java.util.UUID
import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import com.abc.common.AutoPassivation
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt}
object PCNProcessor {
import akka.contrib.pattern.ShardRegion
import com.abc.pcn.core.PCN
val shardName = "pcn"
val idExtractor: ShardRegion.IdExtractor = {
case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn))
}
val shardResolver: ShardRegion.ShardResolver = {
case ProcessPCN(pcn) => pcn.id.toString
}
// shard settings
def props = Props(classOf[PCNProcessor])
// command and response
case class ProcessPCN(pcn: PCN)
case class NotProcessed(reason: String)
case object Processed
}
class PCNProcessor
extends PersistentActor
with AtLeastOnceDelivery
with AutoPassivation
with ActorLogging {
import com.abc.pcn.core.actors.PCNProcessor._
import scala.concurrent.duration._
context.setReceiveTimeout(10.seconds)
private val pcnId = UUID.fromString(self.path.name)
private var state: String = "not started"
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
override def receiveRecover: Receive = {
case SnapshotOffer(_, s: String) =>
log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s)
state = s
}
def receiveCommand: Receive = withPassivation {
case ProcessPCN(pcn)
if state == "processed" =>
sender ! Left(NotProcessed("Already processed PCN"))
case ProcessPCN(pcn)
if pcn.name.isEmpty =>
val error: String = "Name is invalid"
persist(PCNNotProcessedEvt(pcn.id, error)) { evt =>
state = "invalid"
saveSnapshot(state)
sender ! Left(NotProcessed(error))
}
case ProcessPCN(pcn) =>
persist(PCNProcessedEvt(pcn.id)) { evt =>
state = "processed"
saveSnapshot(state)
sender ! Right(Processed)
}
}
}
Update:
After logging out the metadata for the received snapshot, I can see the problem is that the snapshotterId is not resolving properly and is always being set to pcn-processor-${pcnId.toString} without resolving the bit in italics.
[INFO] [06/06/2015 09:10:00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5] Recovering. PCN ID: 16b3d4dd-9e0b-45de-8e32-de799d21e7c5, Metadata of snapshot SnapshotMetadata(pcn-processor-${pcnId.toString},1,1433577553585)
Upvotes: 0
Views: 565
Reputation: 58
I think you are misusing the Scala string interpolation feature.
Try in the following way:
override def persistenceId: String = s"pcn-processor-${pcnId.toString}"
Please note the use of s
before the string literal.
Upvotes: 3
Reputation: 57
Ok fixed this by changing the persistence id to the following line:
override def persistenceId: String = "pcn-processor-" + pcnId.toString
The original in string version:
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
only works for persisting to journal but not for snapshots.
Upvotes: 0