Reputation: 665
I try to write an app using that is using Akka (version 2.4.0) Persistency and Cassandra Plugin (version 0.6, https://github.com/krasserm/akka-persistence-cassandra) to recover from failures.
The events are being stored to cassandra with no issues, however, one I try to kill and actor, so the supervisor restarts it, the events are not received by receiveRecover
.
It seems that the issue is with the plugin itself, as if I use shared LevelDB instead of cassandra, the events are being received on the recovery step.
Here is the implementation of my persistent actor:
class SimplePersistentActor extends PersistentActor with ActorLogging {
def persistenceId: String = context.self.path.name
override def preRestart(cause: Throwable, msg: Option[Any]) = {
log.debug(s"Restarting ${getClass.getSimpleName}")
super.preRestart(cause, msg)
}
override def postStop() = {
log.debug(s"Stopping ${getClass.getSimpleName}")
super.postStop()
}
var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())
def receiveCommand ={
case msg @ TransactionStart(transactionId) =>
persist(msg) { _ => }
log.debug(s"Starting a transaction with id $transactionId")
transactionData = Right(RunningTransactionData(transactionId, List()))
/* Send a reply */
sender() ! transactionId
case msg @ TransactionData(data) =>
persist(msg) { _ => }
transactionData match {
case Right(t: RunningTransactionData) =>
val updatedTransaction = t.copy(data = t.data ::: List(data))
log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
transactionData = Right(updatedTransaction)
/* Send a reply */
sender() ! t.transactionId
case _ => log.error("Actor's transaction data is not initialized")
}
case TransactionEnd(transactionId) =>
transactionData match {
case Right(t: RunningTransactionData) =>
log.debug(s"Ending a transaction with id ${t.transactionId}")
transactionData = Left(UninitializedData())
/* Send a reply */
sender() ! t.transactionId
case _ => log.error("Actor's transaction data is not initialized")
}
case other =>
log.debug(s"Unexpected event received: $other")
}
def receiveRecover = {
case message =>
log.debug(s"Recovery Step. Message $message received")
}
}
In both cases, that I describe above, the code doesn't change. Has anyone seen this issue before?
Upvotes: 2
Views: 583
Reputation: 1952
I've run into the same thing and found a fix that works for me. I know your question is pretty old but since I was seeing the same problem on the latest version of Akka Cassandra Persistence (0.86) I thought it would be worth mentioning.
The problem I had came from the following config.
cassandra-main-journal = ${cassandra-journal} {
contact-points = ["localhost"]
keyspace-autocreate = true
tables-autocreate = true
keyspace = "main_akka_journal"
}
So, take the default cassandra-journal
config and override the keyspace
. Then, like you're doing, overriding persistenceId
in the Akka persistent actor to point to this config.
What happens if you do this is all writes to the Actor go the the main_akka_journal
keyspace. On restarting the Actor you then get a RecoveryCompleted
message but you don't see any of the messages you had written. However when you receive RecoveryCompleted
the lastSequenceNr
will be correct.
What's interesting is that if you have keyspace-autocreate=true
you'll see two keyspaces where created. main_akka_journal
and akka
.
So the problem is the persistent actor is writing to the main_akka_journal
keyspace, on restart it's reading events from the akka
keyspace (which is empty) and then reading the lastSequenceNr
from the main_akka_journal
keyspace (which is correct).
The solution for me was this config:
cassandra-main-journal = ${cassandra-journal} {
contact-points = ["localhost"]
keyspace-autocreate = true
tables-autocreate = true
keyspace = "main_akka_journal"
query-plugin = "cassandra-main-query-plugin"
}
cassandra-main-query-plugin = ${cassandra-query-journal} {
write-plugin = "cassandra-main-journal"
}
Otherwise by default the write-plugin
points to cassandra-journal
and the akka
keyspace.
Upvotes: 0