Reputation: 11
Last year I started using the actor model with Akka.NET. Now I started using MassTransit (v3.5.7) with RabbitMQ and I really love both!
In the request/response scenario, my request consumer executes its business logic by wrapping the request in a new message and Asking an actor to do the actual job. So basically the consumer awaits on an actor's Ask method. This (extension) method accepts the message and a timeout as arguments. I'd like to use the same timeout value used by the originator of the request.
Is there a simple way to obtain, in the consumer context, the original timeout used by the caller in order to pass it to the actor's Ask method?
Note: I'd like to avoid adding the timeout to the request interface.
Upvotes: 0
Views: 2669
Reputation: 11
finally I found a solution! It's quite easy (once investigated the MassTransit source code :-) and works for me but if someone has some advice or hint please let me know.
So, basically I create a support library for MassTransit, where I added a class with two extension methods:
The CreateRequestClientWithTimeoutHeader() method creates a client and stores the string representation of the passed timeout (expressed in seconds) in the message header. This will be used by the client.
The GetClientTimeout() method retrieves the value from the message header and converts it to a TimeSpan. This will be used in the consumer.
Here's the code:
public static class MassTransitExtMethods
{
private const string ClientTimeoutHeaderKey = "__ClientTimeout__";
public static IRequestClient<TRequest, TResponse> CreateRequestClientWithTimeoutHeader<TRequest, TResponse>
(
this IBus bus,
Uri address,
TimeSpan timeout,
TimeSpan? ttl = default(TimeSpan?),
Action<SendContext<TRequest>> callback = null
)
where TRequest : class
where TResponse : class
{
return
bus
.CreateRequestClient<TRequest, TResponse>
(
address,
timeout,
ttl,
context =>
{
context
.Headers
.Set
(
ClientTimeoutHeaderKey,
timeout.TotalSeconds.ToString(CultureInfo.InvariantCulture)
);
callback?.Invoke(context);
}
);
}
public static TimeSpan? GetClientTimeout(this ConsumeContext consumeContext)
{
string headerValue =
consumeContext
.Headers
.Get<string>(ClientTimeoutHeaderKey);
if (string.IsNullOrEmpty(headerValue))
{
return null;
}
double timeoutInSeconds;
if (double.TryParse(headerValue, NumberStyles.Any, CultureInfo.InvariantCulture, out timeoutInSeconds))
{
return TimeSpan.FromSeconds(timeoutInSeconds);
}
return null;
}
}
To use it, create the client using the new extension method:
var client =
mybus
.CreateRequestClientWithTimeoutHeader<IMyRequest, IMyResponse>
(
new Uri(serviceAddress),
TimeSpan.FromSeconds(10.0)
);
And here is a very simple example of a consumer using an Akka.NET actor, which implements the business logic (please note that the implementation is not complete):
public class MyReqRespProcessor : IConsumer<IMyRequest>
{
private readonly IActorRef _myActor;
public async Task Consume(ConsumeContext<IMyRequest> context)
{
TimeSpan? clientTimeout = context.GetClientTimeout();
var response = await
_myActor
.Ask<IMyResponse>(context.Message, clientTimeout ?? PredefinedTimeout)
.ConfigureAwait(false);
await
context
.RespondAsync<IMyResponse>(response)
.ConfigureAwait(false);
}
}
In a real scenario, with a lot of requests, the actor may be a router, configured according to the endpoint configuration (for example the prefetch count value).
I know this is not a perfect solution but it helps to give, on the server side, a measure of the max processing time. In case of network delays, the client may receive the timeout before the actor stops processing the request. Anyway the actor will work on that request at most for the time specified by the client. And this is what I wanted to reach out.
Upvotes: 1