Vagif Abilov
Vagif Abilov

Reputation: 9991

How to retrieve all journal events using Akka persistence queries?

Akka persistence queries have following predefined operations:

EventsByPersistenceId EventsByTag CurrentEventsByPersistenceId CurrentEventsByTag AllPersistenceIds

But what if I need to get all past events, some sort of CurrentEvents operation? I can't figure out how I can implement it in Akka persistence queries terms.

Upvotes: 1

Views: 519

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

I'm not very familiar with the persistence queries module, but these operations are all defining some akka-streams sources. You could attempt to combine them as follows:

  def currentEvents(fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
    currentPersistenceIds().flatMapConcat(id => currentEventsByPersistenceId(id, fromSequenceNr, toSequenceNr))

(flatMapMerge with concurrency breadth is an alternative to flatMapConcat in case you'd like to parallelise this)

Upvotes: 2

Related Questions