Reputation: 815
I have a C# Akka.net project using 6 actors. One actor (LoggingActor) is responsible for taking objects and transmitting them to a web server.
Whenever the LoggingActor encounters a network failure, it seems to bottleneck my entire actor system; everything stalls until the LoggingActor completes its method.
The blocking method in LoggingActor contains this:
try
{
var rs = new RemoteServer();
var rsr = rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.Result;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
And the function SendMovement looks like this:
public async Task<HttpStatusCode> SendMovement(Movement movement, string siteToken, string serverUrl)
{
//Get remote server
var createMovementUrl = serverUrl + MovementsRoute + "?site_token=" + siteToken;
var jsonString = "{ \"movement\": { \"approach\": \"" + movement.Approach + "\", \"exit\": \"" + movement.Exit + "\", \"turntype\": \"" + movement.TurnType + "\", \"objectclass\": \"" + movement.TrafficObjectType + "\", \"created_at\": " + movement.Timestamp.Ticks + "} }";
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
//Transmit
var response = await Client.PostAsync(createMovementUrl, content);
return response.StatusCode;
}
The purpose of LoggingActor is to attempt to POST my objects to the server, and to buffer them for sending later if the initial attempt fails. It doesn't matter what order they're transmitted in for my application.
How can I modify LoggingActor or the SendMovement function to allow LoggingActor to 'fall through' the LoggingActor's message-handling function by sending off the POST request before waiting for the response?
I think what I want is the server's POST response (or an exception) to trigger another message to the actor, which tells the actor either to remove it from the send-queue or to keep it (i.e. do nothing).
Upvotes: 1
Views: 988
Reputation: 7542
You're calling rs.SendMovement
, which returns a Task, on which you're calling a Result
property. This will lock entire thread living on a current thread pool - since thread pool is shared between all actors/tasks making use of it and usually it contains 1 worker thread per CPU core. This means, that you've effectively turned off entire CPU core until the remote end will respond (or timeout will occurr).
In general, you should never block a thread when working with asynchronous code. In Akka.NET working with Task-based API can be done in 1 of 2 ways.
ReceiveAsync
First solution is to make your actor inherit from ReceiveActor
and use ReceiveAsync
method to define your message handler:
class MyActor : ReceiveActor
{
public MyActor()
{
ReceiveAsync<MyMessage>(async message => {
try
{
var rs = new RemoteServer();
var rsr = await rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl);
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
});
}
}
This will create something known as non-reentrant actor - it means, that while you won't block any underlying thread (so that it can still be used by other actors working concurrently), you WILL block current actor, preventing it from receiving any other messages until current async lambda handler will reach the end.
PipeTo
Another approach - which is by default used in pretty much all of the akka internal architecture - is to use PipeTo
method over Task:
class MyActor : ActorBase
{
// start with an actor in ready state
protected override bool Receive(object message) => Ready(message);
bool Ready(object message)
{
switch (message)
{
case MyMessage msg:
var rs = new RemoteServer();
rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.PipeTo(Self,
success: rsr => new Status.Success(rsr),
failure: ex => new Status.Failure(ex));
Become(Waiting);
return true;
default: return false;
}
}
bool Waiting(object message)
{
switch (message)
{
case Status.Success success:
var rsr = (HttpStatusCode)success.Status;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
Become(Ready);
return true;
case Status.Failure failure:
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
Become(Ready);
return true;
default: return false;
}
}
}
This way you're turning your actor into a custom state machine. The piece of code responsible for awaiting logic here is split into two steps - here represented as Ready
/Waiting
states. After sending the async request, actor is changing its behavior - meaning that he can now handle different set of incoming messages or react on them differently. Returned boolean informs actor system if message was handled by current actor or not - this may trigger unhandled call, which by default will log unhandled message.
One of the advantages of that approach is that this actor is reentrant - it means, that while awaiting for the SendMovement
to return a result (or fail), this actor is free to take and process other messages.
Upvotes: 3