dvRoss
dvRoss

Reputation: 1

MassTransit RoutingSlipActivityCompleted goes into skipped queue

I'm using MassTransit on RabbitMQ. I have a routing slip with activities started with a Request / Response (I need to inform caller when routing slip has been completed). For some reason I cannot receive responses (TimeoutException) and I found RoutingSlipActivityCompleted in the skipped queue. Can someone help me?

Here are some pieces of code:

public class UpsertExternalRequestConsumer : IConsumer<IUpsertExternalRequest>, IConsumer<RoutingSlipCompleted>, IConsumer<RoutingSlipFaulted>


public async Task Consume(ConsumeContext<IUpsertExternalRequest> context)
{
    _logger.LogTrace("IUpsertExternalRequest RECEIVED: {@message}", context.Message);

    var routingSlip = CreateRoutingSlip(context);

    await context.Execute(routingSlip);
}

private RoutingSlip CreateRoutingSlip(ConsumeContext<IUpsertExternalRequest> context)
{
    var builder = new RoutingSlipBuilder(NewId.NextGuid());

    builder.SetVariables(new
    {
        context.RequestId,
        context.ResponseAddress
    });

     
    //InsertIdentity
    builder.AddActivity(InsertIdentityActivity.Name, InsertIdentityActivity.Queue,
    new
    {
        IdentityFields = ...
    });

    //InsertExtendedIdentity
    builder.AddActivity(InsertExtendedIdentityActivity.Name, InsertExtendedIdentityActivity.Queue, 
new
    {
        IdentityFields = EntitiesExtensions.MapToKeyValues(context.Message.Identity.AdditionalInfo, BtpExternalFieldsMapping._ExtIdentitydict)
    });

    //other activities...
    

    builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed & RoutingSlipEvents.Faulted);
 
    return builder.Build();
}


public async Task Consume(ConsumeContext<RoutingSlipCompleted> context)
{
    var requestId = context.GetVariable<Guid>("RequestId");
    var responseAddress = context.GetVariable<Uri>("ResponseAddress");

    var responseEndpoint = await context.GetSendEndpoint(responseAddress);

    await responseEndpoint.Send<IUpsertExternalResponse>(
        new
        {
            Success = true,
            __RequestId = requestId,
            Data = context.Message.Variables["identityId"]
        });
}

public async Task Consume(ConsumeContext<RoutingSlipFaulted> context)
{
    var requestId = context.GetVariable<Guid>("RequestId");
    var responseAddress = context.GetVariable<Uri>("ResponseAddress");

    if (requestId.HasValue && responseAddress != null)
    {
        var responseEndpoint = await context.GetSendEndpoint(responseAddress);

        var validationError = context.Message.Variables.TryGetValue("ErrorCode", out string errorValidation);

        var exceptions = context.Message.ActivityExceptions.Select(x => x.ExceptionInfo);

        _logger.LogTrace("RoutingSlipFaulted: sending IBaseResponse. requestId:{requestId}, ResponseAddress:{responseAddress}", requestId, responseAddress);

        await responseEndpoint.Send<IUpsertExternalResponse>(
                new
                {
                    Success = false,
                    ErrorCode = errorValidation is null ? exceptions.Select(x => x.Message).FirstOrDefault() : ScsExceptions.ScsValidationError.ToString(),
                    __RequestId = requestId,
                });
    }
}


//And here is how i call the request:
 var response = await _requestClient.GetResponse<IUpsertExternalResponse>(new
 {
      //some data...
 }, cancellationToken);

I receive a timeout exception trying to receive request response and I found a RoutingSlipActivityCompleted in skipped queue

I check messages bindings and they seem to be correct. Note that this project was working and I can't understand why it doesn't anymore.

This is a sample message I found in skipped queue:

{

  "messageId": "58060000-569c-0050-162a-08dc1e7bd20e",

  "requestId": null,

  "correlationId": "58060000-569c-0050-30cb-08dc1e7bd20c",

  "conversationId": "fc2b0000-569c-0050-525f-08dc1e7bd1ec",

  "initiatorId": "fc2b0000-569c-0050-8abe-08dc1e7bd1f0",

  "sourceAddress": "rabbitmq://****/update-extended-identity_execute",

  "destinationAddress": "rabbitmq://****/UpsertExternalRequest",

  "responseAddress": null,

  "faultAddress": null,

  "messageType": [

    "urn:message:MassTransit.Courier.Contracts:RoutingSlipActivityCompleted"

  ],

  "message": {

    "trackingNumber": "fc2b0000-569c-0050-8abe-08dc1e7bd1f0",

    "timestamp": "2024-01-26T14:33:53.3592779Z",

    "duration": "00:00:00.0122791",

    "executionId": "58060000-569c-0050-30cb-08dc1e7bd20c",

    "activityName": "UpdateExtendedIdentity",

    "host": {

      "machineName": "IT4076YW",

      "processName": "****",

      "processId": 1624,

      "assembly": "****",

      "assemblyVersion": "1.0.9.0",

      "frameworkVersion": "6.0.26",

      "massTransitVersion": "8.0.13.0",

      "operatingSystemVersion": "Microsoft Windows NT 10.0.17763.0"

    },

    "arguments": {

      "loggedUser": 289,

      "appCode": "****",

      "identityId": 124288,

      "identityFields": [

        //business data... 
                
    },

    "data": {

      "identityId": 124288,

      //*** 
    },

    "variables": {

      "requestId": "fc2b0000-569c-0050-41fc-08dc1e7bd1e9",

      "responseAddress": "rabbitmq://****Api_bus_9oioyynsuoyfy9z6bdqbh6ofrt?temporary=true",

      "identityId": 124288,

      "idLoggedUser": 289

    }

  },

  "expirationTime": null,

  "sentTime": "2024-01-26T14:33:53.3717034Z",

  "headers": {},

  "host": {

    "machineName": "IT4076YW",

    "processName": "****",

    "processId": 1624,

    "assembly": "****",

    "assemblyVersion": "1.0.9.0",

    "frameworkVersion": "6.0.26",

    "massTransitVersion": "8.0.13.0",

    "operatingSystemVersion": "Microsoft Windows NT 10.0.17763.0"

  }

}

Updated: Fault during startup The reason was an error during startup. Why is this happening sometims?

Upvotes: 0

Views: 107

Answers (0)

Related Questions