Reputation: 1
I'm using MassTransit on RabbitMQ. I have a routing slip with activities started with a Request / Response (I need to inform caller when routing slip has been completed). For some reason I cannot receive responses (TimeoutException) and I found RoutingSlipActivityCompleted in the skipped queue. Can someone help me?
Here are some pieces of code:
public class UpsertExternalRequestConsumer : IConsumer<IUpsertExternalRequest>, IConsumer<RoutingSlipCompleted>, IConsumer<RoutingSlipFaulted>
public async Task Consume(ConsumeContext<IUpsertExternalRequest> context)
{
_logger.LogTrace("IUpsertExternalRequest RECEIVED: {@message}", context.Message);
var routingSlip = CreateRoutingSlip(context);
await context.Execute(routingSlip);
}
private RoutingSlip CreateRoutingSlip(ConsumeContext<IUpsertExternalRequest> context)
{
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.SetVariables(new
{
context.RequestId,
context.ResponseAddress
});
//InsertIdentity
builder.AddActivity(InsertIdentityActivity.Name, InsertIdentityActivity.Queue,
new
{
IdentityFields = ...
});
//InsertExtendedIdentity
builder.AddActivity(InsertExtendedIdentityActivity.Name, InsertExtendedIdentityActivity.Queue,
new
{
IdentityFields = EntitiesExtensions.MapToKeyValues(context.Message.Identity.AdditionalInfo, BtpExternalFieldsMapping._ExtIdentitydict)
});
//other activities...
builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed & RoutingSlipEvents.Faulted);
return builder.Build();
}
public async Task Consume(ConsumeContext<RoutingSlipCompleted> context)
{
var requestId = context.GetVariable<Guid>("RequestId");
var responseAddress = context.GetVariable<Uri>("ResponseAddress");
var responseEndpoint = await context.GetSendEndpoint(responseAddress);
await responseEndpoint.Send<IUpsertExternalResponse>(
new
{
Success = true,
__RequestId = requestId,
Data = context.Message.Variables["identityId"]
});
}
public async Task Consume(ConsumeContext<RoutingSlipFaulted> context)
{
var requestId = context.GetVariable<Guid>("RequestId");
var responseAddress = context.GetVariable<Uri>("ResponseAddress");
if (requestId.HasValue && responseAddress != null)
{
var responseEndpoint = await context.GetSendEndpoint(responseAddress);
var validationError = context.Message.Variables.TryGetValue("ErrorCode", out string errorValidation);
var exceptions = context.Message.ActivityExceptions.Select(x => x.ExceptionInfo);
_logger.LogTrace("RoutingSlipFaulted: sending IBaseResponse. requestId:{requestId}, ResponseAddress:{responseAddress}", requestId, responseAddress);
await responseEndpoint.Send<IUpsertExternalResponse>(
new
{
Success = false,
ErrorCode = errorValidation is null ? exceptions.Select(x => x.Message).FirstOrDefault() : ScsExceptions.ScsValidationError.ToString(),
__RequestId = requestId,
});
}
}
//And here is how i call the request:
var response = await _requestClient.GetResponse<IUpsertExternalResponse>(new
{
//some data...
}, cancellationToken);
I receive a timeout exception trying to receive request response and I found a RoutingSlipActivityCompleted in skipped queue
I check messages bindings and they seem to be correct. Note that this project was working and I can't understand why it doesn't anymore.
This is a sample message I found in skipped queue:
{
"messageId": "58060000-569c-0050-162a-08dc1e7bd20e",
"requestId": null,
"correlationId": "58060000-569c-0050-30cb-08dc1e7bd20c",
"conversationId": "fc2b0000-569c-0050-525f-08dc1e7bd1ec",
"initiatorId": "fc2b0000-569c-0050-8abe-08dc1e7bd1f0",
"sourceAddress": "rabbitmq://****/update-extended-identity_execute",
"destinationAddress": "rabbitmq://****/UpsertExternalRequest",
"responseAddress": null,
"faultAddress": null,
"messageType": [
"urn:message:MassTransit.Courier.Contracts:RoutingSlipActivityCompleted"
],
"message": {
"trackingNumber": "fc2b0000-569c-0050-8abe-08dc1e7bd1f0",
"timestamp": "2024-01-26T14:33:53.3592779Z",
"duration": "00:00:00.0122791",
"executionId": "58060000-569c-0050-30cb-08dc1e7bd20c",
"activityName": "UpdateExtendedIdentity",
"host": {
"machineName": "IT4076YW",
"processName": "****",
"processId": 1624,
"assembly": "****",
"assemblyVersion": "1.0.9.0",
"frameworkVersion": "6.0.26",
"massTransitVersion": "8.0.13.0",
"operatingSystemVersion": "Microsoft Windows NT 10.0.17763.0"
},
"arguments": {
"loggedUser": 289,
"appCode": "****",
"identityId": 124288,
"identityFields": [
//business data...
},
"data": {
"identityId": 124288,
//***
},
"variables": {
"requestId": "fc2b0000-569c-0050-41fc-08dc1e7bd1e9",
"responseAddress": "rabbitmq://****Api_bus_9oioyynsuoyfy9z6bdqbh6ofrt?temporary=true",
"identityId": 124288,
"idLoggedUser": 289
}
},
"expirationTime": null,
"sentTime": "2024-01-26T14:33:53.3717034Z",
"headers": {},
"host": {
"machineName": "IT4076YW",
"processName": "****",
"processId": 1624,
"assembly": "****",
"assemblyVersion": "1.0.9.0",
"frameworkVersion": "6.0.26",
"massTransitVersion": "8.0.13.0",
"operatingSystemVersion": "Microsoft Windows NT 10.0.17763.0"
}
}
Updated: Fault during startup The reason was an error during startup. Why is this happening sometims?
Upvotes: 0
Views: 107