Michael D.
Michael D.

Reputation: 1319

AKKA.Net: How to send message from supervisor to child on Actor Restart

Considering this code:

public class ParentActor : ReceiveActor
{
    public ParentActor()
    {
        for( i = 1; i <= 5; i++)
    
        var childActor = Context.ActorOf<ChildActor>($"Child{i}");
        childActor.Tell( new LoadData(i));
        
    }
}

public class ChildActor : ReceiveActor
{
    
    public ChildActor()
    {
        Receive<LoadData>(msg => 
        {
            var data = SomeRiskyOverNetworkCall(msg.AccountId);
        });
    }
}

public class LoadData
{
    
    public int AccountId { get; }
    
    public LoadData(int accountId)
    {
        AccountId = accountId;
    }

}

The child actor performs some risky operation that fails from time to time The call is using some parameters that are passed from parent/supervisor actor after creation of the child.

How to "supervise" this scenario? I need the same LoadData (with the same parameters) message to be processed after restart.

Upvotes: 1

Views: 667

Answers (1)

avikalb
avikalb

Reputation: 76

You can use pre-restart hook with some supervisor strategy. below is a simple example of the same. https://getakka.net/articles/actors/fault-tolerance.html#creating-a-supervisor-strategy

using System;
using System.Threading;
using Akka.Actor;

namespace AkkaConsoleSimple
{
    public class Start
    {
        public Start()
        {
            
        }
        
    }

    public class DoSomething
    {
        public DoSomething(int who)
        {
            Who = who;
        }
        public int Who { get; private set; }
    }

    public class FailedMessage
    {
        public FailedMessage(object who)
        {
            Who = who;
        }
        public object Who { get; private set; }
    }

    public class Child : ReceiveActor
    {
        public Child()
        {
            Receive<DoSomething>(msg =>
            {
                Console.WriteLine($"getting message no {msg.Who}");
                if (msg.Who == 10)
                {
                    throw new StackOverflowException();
                }
                
            });
        }

        protected override void PreRestart(Exception cause, object message)
        {
            Sender.Tell(new FailedMessage(message));
            Self.Tell(message);
            base.PreRestart(cause, message);
        }
    }
    public class Parent : ReceiveActor
    {
        public Parent()
        {
            Receive<Start>(greet =>
            {
                var child = Context.ActorOf<Child>();
                for (int i = 0; i < 11; i++)
                {
                    var msg = new DoSomething(i);
                    child.Tell(msg);
                }
               
            });

            Receive<FailedMessage>(failed => Console.WriteLine(failed.Who));
        }
        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(
                maxNrOfRetries: 10,
                withinTimeRange: TimeSpan.FromMinutes(1),
                localOnlyDecider: ex =>
                {
                    switch (ex)
                    {
                        case StackOverflowException ae:
                            return Directive.Restart;
                        default:
                            return Directive.Escalate;
                    }
                });
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var system = ActorSystem.Create("MySystem");
            var greeter = system.ActorOf<Parent>("parent");
            greeter.Tell(new Start());

            Console.Read();
        }
    }
}

Upvotes: 1

Related Questions