simonl
simonl

Reputation: 1240

Implementing a "live" stream to drive an Akka 2.4 Persistence Query

I have been investigating the experimental Akka Persistence Query module and am very interested in implementing a custom read journal for my application. The documentation describes two main flavors of queries, ones that return current state of the journal (e.g CurrentPersistenceIdsQuery) and ones that return a subscribe-able stream that emit events as the events are committed to the journal via the write side of the application (e.g. AllPersistenceIdsQuery)

For my contrived application, I am using Postgres and Slick 3.1.1 to drive the guts of these queries. I can successfully stream database query results by doing something like:

override def allPersistenceIds = {
  val db = Database.forConfig("postgres")
  val metadata = TableQuery[Metadata]

  val query = for (m <- metadata) yield m.persistenceId
  Source.fromPublisher(db.stream(query.result))
}

However, the stream is signaled as complete as soon as the underlying Slick DB action is completed. This doesn't seem to fulfill the requirement of a perpetually open stream that is capable of emitting new events.

My questions are:

Thanks!

Upvotes: 3

Views: 521

Answers (2)

Iurii Ant
Iurii Ant

Reputation: 1025

One can use Postgres replication API to get 'infinite' stream of database events. It's supported by Postgres JDBC driver starting from version 42.0.0, see related pull request. However, it's not real stream but rather buffered synchronous reader from database WAL.

PGReplicationStream stream =
    pgConnection
        .replicationStream()
        .logical()
        .withSlotName("test_decoding")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .start();
while (true) {
  ByteBuffer buffer = stream.read();
  //process logical changes
}

It would be nice to have an Akka Streams adapter (Source) in alpakka project for this reader.

Upvotes: 0

It's not as trivial as this one line of code however you're one the right track already.

In order to implement an "infinite" stream you'll need to query multiple times - i.e. implement polling, unless the underlying db allows for an infinite query (which here it does not AFAICS).

The polling needs to keep track of the "offset", so if you're querying by some tag, and you issue another poll, you need to start that (2nd now) query from the "last emitted element", and not the beginning of the table again. So you need somewhere, most likely an Actor, that keeps this offset.

The Query Side LevelDB plugin is not the best role model for other implementations as it assumes much about the underlying journal and how those work. Also, LevelDB is not meant for production with Akka Persistence – it's a Journal we ship in order to have a persistent journal you can play around with out of the box (without starting Cassandra etc).

If you're looking for inspiration the MongoDB plugins actually should be a pretty good source for that, as they have very similar limitations as the SQL stores. I'm not sure if any of the SQL journals did currently implement the Query side.

Upvotes: 4

Related Questions