lbrahim
lbrahim

Reputation: 3810

Akka.Net PreRestart not executed when exception from async handler

I have the following Actor where I am trying to restart and resend the failing message back to the actor :

public class BuildActor : ReceivePersistentActor
{
    public override string PersistenceId => "asdad3333";

    private readonly IActorRef _nextActorRef;

    public BuildActor(IActorRef nextActorRef)
    {
        _nextActorRef = nextActorRef;

        Command<Workload>(x => Build(x));

        RecoverAny(workload =>
        {
            Console.WriteLine("Recovering");
        });
    }

    public void Build(Workload Workload)
    {
        var context = Context;
        var self = Self;

        Persist(Workload, async x =>
        {
            //after this line executes
            //application goes into break mode
            //does not execute PreStart or Recover
            var workload = await BuildTask(Workload);

            _nextActorRef.Tell(workload);

            context.Stop(self);
        });
    }

    private Task<Workload> BuildTask(Workload Workload)
    {
        //works as expected if method made synchronous
        return Task.Run(() =>
        {
            //simulate exception
            if (Workload.ShowException)
            {
                throw new Exception();
            }

            return Workload;
        });
    }

    protected override void PreRestart(Exception reason, object message)
    {
        if (message is Workload workload)
        {
            Console.WriteLine("Prestart");

            workload.ShowException = false;

            Self.Tell(message);
        }
    }
}

Inside the success handler of Persist I am trying to simulate an exception being thrown but on exception the application goes in to break mode and PreRestart hook is not invoked. But if I make BuildTask method synchronous by removing Task.Run then on exception both PreRestart and Recover<T> methods are invoked.

I would really appreciated if someone can point to me what should be the recommended pattern for this and where I am going wrong.

Upvotes: 1

Views: 568

Answers (1)

Bartosz Sypytkowski
Bartosz Sypytkowski

Reputation: 7542

Most probably, Akka.Persistence is not the good solution for your problem here.

Akka.Persistence uses eventsourcing principles for storing actor's state. Few key points important in this context:

  • What you're sending to actor, is a command. It describes a job, you want to be done. Executing that command may result in doing some actual processing and eventually may lead to persist actor's linear state change history in form of the events.
  • In Akka.NET Persist method is used only to store events - they describe the fact, that something has happened: because of that, they cannot be denied and they cannot fail (a thing that you're doing in your Persist callback).
  • When an actor restarts at any point in time, it will always try to recreate its own state by replaying all events Persisted up to the last known point in time. For this reason it's important that Recover method should only focus on replaying actor's state (it can be called multiple times over the same event) and never result in side effects (example of side effect is sending an email). Any exception thrown there will mean, that actor state is irrecoverably corrupted and that actor will be killed.

If you want to resend the message to your actor, you could:

  1. Put a reliable message queue (i.e. RabbitMQ or Azure Service Bus) or log (Kafka or Event Hub) in front of your actor processing pipeline. This is actually the most reasonable scenario in many cases.
  2. Use at-least-once delivery semantics from Akka.Persistence - but IMHO only if for some reason you cannot use 1st solution.
  3. The most simplistic and unreliable option (since messages are residing only in memory and never persisted) is dead letter queue. Every unhandled message is send there. You can subscribe to it and filter the incoming data to detect which messages should be send again to their recipients.

Upvotes: 5

Related Questions