George
George

Reputation: 317

Akka.net journal reader missing events

In our application we are using Akka.net, with event sourcing. The persistent actors save their events in an SQL Server database. We also have view actors, which subscribe to these events, using a journal reader/persistence query, to create materialised views. We have a table in the database, that has a row for every view actor. This row contains the name of the view actor and the offset of the last event prccessed. At first sight, this is working smoothly. Sometimes however, when we run a test that results in thousands of events, the journal reader is missing some events.

A view actor is a ReceiveActor. When started, it retrieves the last handled event offset from the database (called from the actor's constructor). The offset is piped to self in an OffsetMessage. On receiving the OffsetMessage the view actor initialises the journal reader. On receiving events (in EventEnvelope messages), the views are updated.

The action that is run from the journal reader, first writes a line to the log. That line contains the event offset. The EventEnvelope receive handler also writes a line to the log. That line also contains the event offset.

We have a test that results in 9635 event inserted into the journal. Sometimes the journal reader and the EventEnvelope receive handler are logging less than 9635 events. They both log the same numbers, so it seems the events are missed by the journal reader. The missed events from the log are corresponding to the missing items in the views. We run the test on an empty database. Logging is at the debug level, and does not show exceptions. The missing events (we have seen numbers of 1 to 4) can be among the first, middle or last events. Everytime this is different.

So far we have no idea what is causing this problem, or how it can be solved.

Following are fragments of our code. The view actors all inherit from a base class: ViewActorBase.

internal abstract class ViewActorBase : ReceiveActor, ILogReceive
{
    public ViewActorBase()
    {
        // Some initialisation code
        ....

        this.Receive<OffsetMessage>(this.HandleOffsetMessage);
        this.ReceiveAsync<EventEnvelope>(this.UpdateState);

        var sender = this.Sender;
        var self = this.Self;
        this.GetViewActorOffset(self, sender);
    }

    private void HandleOffsetMessage(OffsetMessage offsetMessage)
    {
        this.InitialiseJournalReader(offsetMessage.Offset);
    }

    private void InitialiseJournalReader(long offset)
    {
        // obtain read journal by plugin id
        var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>($"akka.persistence.query");

        // materialize stream, consuming events
        var materializer = ActorMaterializer.Create(Context.System);

        // issue query to journal
        Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag(this.QueryEventTag, new Sequence(offset));

        var self = this.Self;
        source.RunForeach(envelope => { this.Logger.Debug("{Date:HH:mm:ss.fffff} JournalReader.Tell {Offset}", DateTime.Now, (envelope.Offset as Sequence).Value); self.Tell(envelope); }, materializer);
    }

    private void GetViewActorOffset(IActorRef self, IActorRef sender)
    {
        // Initialise repository
        ....

        repository.GetViewActorOffset(this.GetViewName()).PipeTo(self, sender, offset => new OffsetMessage(offset));
    }
}

internal class MyViewActor : ViewActorBase
{
    protected override async Task UpdateState(EventEnvelope envelope)
    {
        var offset = (envelope.Offset as Sequence).Value;

        this.Logger.Debug("{Date:HH:mm:ss.fffff} {MethodName} {Offset}", DateTime.Now, $"{this.GetType().Name}.UpdateState", offset);

        // Update views
        ....
    }
}

Is there something wrong in our code or architecture? Are there better solutions?

Additional information We have run some tests with SQL Server profiler monitoring the queries to the database.

A query was executed on the event journal, asking for 100 events, starting at offset 204743. The result contained 61 rows.

<Event id="10" name="RPC:Completed">
  <Column id="1" name="TextData">exec sp_executesql N'
        SELECT TOP (@Take)
        e.PersistenceId as PersistenceId, 
        e.SequenceNr as SequenceNr, 
        e.Timestamp as Timestamp, 
        e.IsDeleted as IsDeleted, 
        e.Manifest as Manifest, 
        e.Payload as Payload,
        e.SerializerId as SerializerId,
        e.Ordering as Ordering
        FROM dbo.EventJournal e
        WHERE e.Ordering &gt; @Ordering AND e.Tags LIKE @Tag
        ORDER BY Ordering ASC
        ',N'@Tag nvarchar(10),@Ordering bigint,@Take bigint',@Tag=N'%;Module;%',@Ordering=204743,@Take=100</Column>
  <Column id="9" name="ClientProcessID">1169425116</Column>
  <Column id="10" name="ApplicationName">Core .Net SqlClient Data Provider</Column>
  <Column id="12" name="SPID">82</Column>
  <Column id="13" name="Duration">353890</Column>
  <Column id="14" name="StartTime">2018-08-30T16:32:32.927+02:00</Column>
  <Column id="15" name="EndTime">2018-08-30T16:32:33.28+02:00</Column>
  <Column id="16" name="Reads">326</Column>
  <Column id="17" name="Writes">0</Column>
  <Column id="18" name="CPU">0</Column>
  <Column id="48" name="RowCounts">61</Column>
</Event>

We expexted the next query to start at 204804 (204743 + 61). However, it started at 204810. Why is it skipping (or missing) 6 events?

Upvotes: 2

Views: 316

Answers (0)

Related Questions