monstertjie_za
monstertjie_za

Reputation: 7803

There is no active ActorContext, this is most likely due to use of async operations from within this actor

I have a problem, and I am not quite sure how to solve this, except for making my Akka Actor not have async methods.

Here is my Actor Code:

public class AggregatorActor : ActorBase, IWithUnboundedStash
{
    public IStash Stash { get; set; }

    private AggregatorTimer _aggregatorTimer;
    private IActorSystemSettings _settings;

    private AccountSummary _accountResponse;
    private ContactDetails _contactResponse;
    private AnalyticDetails _analyticsResponse;
    private FinancialDetails _financialResponse;

    private ActorSelection _accountActor;
    private ActorSelection _contactActor;
    private ActorSelection _analyticsActor;
    private ActorSelection _financialActor;

    public AggregatorActor(IActorSystemSettings settings) : base(settings)
    {
        _accountActor = Context.System.ActorSelection(ActorPaths.AccountActorPath);
        _contactActor = Context.System.ActorSelection(ActorPaths.ContactActorPath);
        _analyticsActor = Context.System.ActorSelection(ActorPaths.AnalyticsActorPath);
        _financialActor = Context.System.ActorSelection(ActorPaths.FinancialActorPath);

        _settings = settings;
    }

    #region Public Methods

    public override void Listening()
    {
        ReceiveAsync<ProfilerMessages.ProfilerBase>(async x => await HandleMessageAsync(x));
    }
    private void Busy()
    {
        Receive<ProfilerMessages.ProfilerBase>(x => Stash.Stash());
    }

    private void Aggregate()
    {
        try
        {
            Context.Sender.Tell(AggregatedSummaryResponse.Instance(_accountResponse, _contactResponse, _analyticsResponse, _financialResponse));
        }
        catch (Exception ex)
        {
            ExceptionHandler(ex);
        }
    }

    public override async Task HandleMessageAsync(object msg)
    {
        //if is summary, generate new isntance of AggregatorTimer in _eventHandlerCollection.
        if (msg is ProfilerMessages.GetSummary)
        {
            //Become busy. Stash
            Become(Busy);

            //Handle different requests
            var clientId = (msg as ProfilerMessages.GetSummary).ClientId;
            await HandleSummaryRequest(clientId);
        }
    }
    private async Task HandleSummaryRequest(string clientId)
    {
        try
        {
            var accountMsg = new AccountMessages.GetAggregatedData(clientId);
            _accountResponse = (await _accountActor.Ask<Messages.AccountMessages.AccountResponseAll>(accountMsg, TimeSpan.FromSeconds(_settings.NumberOfSecondsToWaitForResponse))).AccountDetails;

            //Need to uncomment this
            var contactMsg = new ContactMessages.GetAggregatedContactDetails(clientId);
            _contactResponse = (await _contactActor.Ask<Messages.ContactMessages.ContactResponse>(contactMsg, TimeSpan.FromSeconds(_settings.NumberOfSecondsToWaitForResponse))).ContactDetails;

            var analyticMsg = new AnalyticsMessages.GetAggregatedAnalytics(clientId);
            _analyticsResponse = (await _analyticsActor.Ask<Messages.AnalyticsMessages.AnalyticsResponse>(analyticMsg, TimeSpan.FromSeconds(_settings.NumberOfSecondsToWaitForResponse))).AnalyticDetails;

            var financialMsg = new FinancialMessages.GetAggregatedFinancialDetails(clientId);
            _financialResponse = (await _financialActor.Ask<Messages.FinancialMessages.FinancialResponse>(financialMsg, TimeSpan.FromSeconds(_settings.NumberOfSecondsToWaitForResponse))).FinancialDetails;

            //Start new timer
            _aggregatorTimer = new AggregatorTimer(_settings.NumberOfSecondsToWaitForResponse);
            _aggregatorTimer.TimeElapsed += _aggregatorTimer_TimeElapsed;
        }
        catch (Exception ex)
        {
            ExceptionHandler(ex);
        }
    }
    //Event that is raised when an external timers time elapsed.
    private async void _aggregatorTimer_TimeElapsed(object sender, ElapsedTimeHandlerArg e)
    {
        Aggregate();

        _aggregatorTimer = null;

        _accountResponse = null;
        _contactResponse = null;
        _analyticsResponse = null;
        _financialResponse = null;

        //Unstash
        Stash.Unstash();

        //Start listening again
        Become(Listening);
    }

    #endregion
}

Inside the _aggregatorTimer_TimeElapsed event, I call the await Aggregate function, but the following exception is thrown.

There is no active ActorContext, this is most likely due to use of async operations from within this actor

I think this is caused by the fact that the Aggregate function tries to Tell() the Sender about the responses that are aggregated, but those Tasksare not yet completed? I might be completely wrong, but I have no idea why this is thrown.

Upvotes: 0

Views: 2681

Answers (1)

Bartosz Sypytkowski
Bartosz Sypytkowski

Reputation: 7542

I'm not sure what do you even need an AggregatorTimer for - in akka you have a Context.System.Scheduler object, which can be used to schedule events going to happen in the future.

Another thing is that you probably shouldn't execute logic inside event handlers. If you really need them in your code, it's better to limit them only to send a message, once an event gets triggered i.e.:

Receive<TimedOut>(_ => /* handle timeout message */);

var self = Self; // bind current self to a variable, so it won't change
_aggregatorTimer.TimeElapsed += (sender, e) => self.Tell(new TimedOut(), self);

Upvotes: 1

Related Questions