Reputation: 1381
I'm doing the query side of a CQRS Akka actors app.
Query actors are setup as a cluster shard and are filled in with events from one Persistence Query stream.
My questions are:
If one of the actors in cluster shard restart how to recover it?
If the actor which is filler with Persistence Query restarts, how can I cancel current PQ and start it again?
Upvotes: 1
Views: 692
Reputation: 2173
As discussed I would evaluate persisting your query side in a database.
If that is not an option and you want to stick with your single persistence query per shard do the following in your query actor:
var inRecovery: Boolean = true;
override def preStart( ) = {
//Subscribe to your event live stream now, so you don't miss anything during recovery
// e.g. send Subscription message to your persistence query actor
//Re-Read everything up to now for recovery
readJournal.currentEventsByPersistenceId("persistenceId")
.watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
.map(Replay.apply) // Mark your replay messages
.runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}
override def receive = {
case Done => // Recovery is finished
inRecovery = false
unstashAll() // unstash all normal messages received during recovery
case Replay( payload ) =>
//handle replayed messages
case events: Event =>
//handle normal events from your persistence query
inRecovery match {
case true => stash() // stash normal messages until recovery is done
case false =>
// recovery is done, start handling normal events
}
}
case class Replay( payload: AnyRef )
So basically before the actor starts subscribe to the persistence query actor and recover the state with a finite stream of all past events, which terminates after all events have passed through. During recovery stash all incoming events, which are not replayed events. Then after recovery is done, unstash everything and start handling the normal messages.
Upvotes: 2