Dr.Khu
Dr.Khu

Reputation: 665

Events are not recovering in Akka 2.4.0 Persistence & Cassandra Journal Plugin 0.6

I try to write an app using that is using Akka (version 2.4.0) Persistency and Cassandra Plugin (version 0.6, https://github.com/krasserm/akka-persistence-cassandra) to recover from failures. The events are being stored to cassandra with no issues, however, one I try to kill and actor, so the supervisor restarts it, the events are not received by receiveRecover.

It seems that the issue is with the plugin itself, as if I use shared LevelDB instead of cassandra, the events are being received on the recovery step.

Here is the implementation of my persistent actor:



    class SimplePersistentActor extends PersistentActor with ActorLogging {

      def persistenceId: String = context.self.path.name

      override def preRestart(cause: Throwable, msg: Option[Any]) = {
        log.debug(s"Restarting ${getClass.getSimpleName}")
        super.preRestart(cause, msg)
      }

      override def postStop() = {
        log.debug(s"Stopping ${getClass.getSimpleName}")
        super.postStop()
      }

      var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())

      def receiveCommand ={
        case msg @ TransactionStart(transactionId) =>
          persist(msg) { _ => }
          log.debug(s"Starting a transaction with id $transactionId")
          transactionData = Right(RunningTransactionData(transactionId, List()))

          /* Send a reply */
          sender() ! transactionId

        case msg @ TransactionData(data) =>
          persist(msg) { _ => }

          transactionData match {
            case Right(t: RunningTransactionData) =>
              val updatedTransaction = t.copy(data = t.data ::: List(data))
              log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
              transactionData = Right(updatedTransaction)

              /* Send a reply */
              sender() ! t.transactionId          

            case _ => log.error("Actor's transaction data is not initialized")
          }

        case TransactionEnd(transactionId) =>
          transactionData match {
            case Right(t: RunningTransactionData) =>
              log.debug(s"Ending a transaction with id ${t.transactionId}")
              transactionData = Left(UninitializedData())

              /* Send a reply */
              sender() ! t.transactionId

            case _ => log.error("Actor's transaction data is not initialized")
          }      

        case other =>
          log.debug(s"Unexpected event received: $other")
      }

      def receiveRecover = {
        case message =>
          log.debug(s"Recovery Step. Message $message received")
      }
    }


In both cases, that I describe above, the code doesn't change. Has anyone seen this issue before?

Upvotes: 2

Views: 583

Answers (1)

David
David

Reputation: 1952

I've run into the same thing and found a fix that works for me. I know your question is pretty old but since I was seeing the same problem on the latest version of Akka Cassandra Persistence (0.86) I thought it would be worth mentioning.

The problem I had came from the following config.

cassandra-main-journal = ${cassandra-journal} {
  contact-points = ["localhost"]
  keyspace-autocreate = true
  tables-autocreate = true
  keyspace = "main_akka_journal"
}

So, take the default cassandra-journal config and override the keyspace. Then, like you're doing, overriding persistenceId in the Akka persistent actor to point to this config.

What happens if you do this is all writes to the Actor go the the main_akka_journal keyspace. On restarting the Actor you then get a RecoveryCompleted message but you don't see any of the messages you had written. However when you receive RecoveryCompleted the lastSequenceNr will be correct.

What's interesting is that if you have keyspace-autocreate=true you'll see two keyspaces where created. main_akka_journal and akka.

So the problem is the persistent actor is writing to the main_akka_journal keyspace, on restart it's reading events from the akka keyspace (which is empty) and then reading the lastSequenceNr from the main_akka_journal keyspace (which is correct).

The solution for me was this config:

cassandra-main-journal = ${cassandra-journal} {
  contact-points = ["localhost"]
  keyspace-autocreate = true
  tables-autocreate = true
  keyspace = "main_akka_journal"
  query-plugin = "cassandra-main-query-plugin"
}

cassandra-main-query-plugin = ${cassandra-query-journal} {
  write-plugin = "cassandra-main-journal"
}

Otherwise by default the write-plugin points to cassandra-journal and the akka keyspace.

Upvotes: 0

Related Questions