Ali Eshghi
Ali Eshghi

Reputation: 1243

RoutingSlip not routing back to publisher

Imagine I have this publisher:

public async Task<IActionResult> AliEshghiPaymentUrl(GeneralAuthenticatePaymentRequestModel model)
{
    try
    {
        //var result = await _mediator.Send(new Commands.GeneratePaymentUrlCommand(model, Guid.NewGuid().ToString("N")));
        var result = await _massClientRequest.GetResponse<ResultPayMentInDBResponse>(new GeneralAuthenticatePaymentRequestModel
                {
                    MyRequestedUrl = model.MyRequestedUrl,
                    AccountRequest = model.AccountRequest,
                    UserId = model.UserId
                });

        return Ok(result);
    }
    catch (Exception ex)
    {
        throw;
    }
}

And by this micro request I land in my target consumer:

public async Task Consume(ConsumeContext<GeneralAuthenticatePaymentRequestModel> context)
{
    var routingSlip = CreateRoutingSlip(context);
    await context.Execute(routingSlip);
}

private RoutingSlip CreateRoutingSlip(ConsumeContext<GeneralAuthenticatePaymentRequestModel> context)
{
    var builder = new RoutingSlipBuilder(NewId.NextGuid());
    builder.AddSubscription(context.ReceiveContext.InputAddress,
                            RoutingSlipEvents.Completed
                               | RoutingSlipEvents.Faulted);

    builder.AddVariable("userId", context.Message.UserId);
    builder.AddVariable(nameof(context.RequestId), context.RequestId);
    builder.AddVariable(nameof(context.ResponseAddress), context.ResponseAddress);
    builder.AddVariable(nameof(context.FaultAddress), context.FaultAddress);
    builder.AddVariable("Request", context.Message);

    var dbQueueName = _formatter.
                ExecuteActivity<CheckAuthenticatedRequestFromDBActivity, AuthenticatePaymentRequestModel>();
    builder.AddActivity(nameof(SetAuthenticatedRequestWithDbActivity),
                new Uri($"queue:{dbQueueName}"), 
                new {});
    var apiQueueName = _formatter.ExecuteActivity<DesideWhichBankToChoseActivity, PaymentBankRequestModel>();
    builder.AddActivity(nameof(DesideWhichBankToChoseActivity), new Uri($"queue:{apiQueueName}"), new
            {
                MyRequestUrl ="/myeshghi"
            });

    var addToDbQueueName = _formatter.ExecuteActivity<SetChoosedBankInDBActivity, SetPaymnetResultInDBRequest>();
    builder.AddActivity(nameof(SetChoosedBankInDBActivity), new Uri($"queue:{addToDbQueueName}"), new
            {
                MyRequestUrl = "/myeshghi"
            });
            
    return builder.Build();
}

After I route finally I am landing to my RoutingSlipCompleted:

public async Task Consume(ConsumeContext<RoutingSlipCompleted> context)
{
    var isSucceed = context.GetVariable<bool>("IsSucceed");
    var model = new ResultPayMentInDBResponse();
    model.IsSucceed = isSucceed ==null? false: (bool)isSucceed;

    var requestId = context.GetVariable<Guid>(nameof(ConsumeContext.RequestId));
    var responseAddress = context.GetVariable<Uri>(nameof(ConsumeContext.ResponseAddress));

    if (requestId.HasValue && responseAddress != null)
    {
        var responseEndpoint = await context.GetSendEndpoint(responseAddress);
        await responseEndpoint.Send<ResultPayMentInDBResponse>(model);

        // Console.WriteLine("end");
        // await responseEndpoint.Send<ResultPayMentInDBResponse>(model);
    }
}

Here with wait responseEndpoint.Send<ResultPayMentInDBResponse>(model); I want to route back to my publisher, but I get

{"Timeout waiting for response, RequestId: cc200000-fb75-4ced-c30e-08dbd87c612b"}

Also for more information I share my program.cs:

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMassTransit(x =>
{
    x.SetSnakeCaseEndpointNameFormatter();
    x.SetKebabCaseEndpointNameFormatter();
    //x.AddConsumer<AuthenticationFlowConsumer>();
    x.AddConsumer<PaymentRequestFlowConsumer>();

    x.AddActivitiesFromNamespaceContaining<CourierActivities>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(builder.Configuration["RabbitMQModel:HostAddress"], port:
            ushort.Parse(builder.Configuration["RabbitMQModel:Port"]), "/", h =>
        {
            h.Username(builder.Configuration["RabbitMQModel:Username"]);
            h.Password(builder.Configuration["RabbitMQModel:Password"]);
        });

        //cfg.ReceiveEndpoint("Authentication_Flow_Consumer", e =>
        //{
        //    e.ConfigureConsumer<AuthenticationFlowConsumer>(context);
        //});
        cfg.ReceiveEndpoint("PAYMENTFLOW_Request", e =>
        {
            e.ConfigureConsumer<PaymentRequestFlowConsumer>(context);
        });

        cfg.UseDelayedMessageScheduler();

        cfg.ConfigureEndpoints(context);
    });
});
var app = builder.Build();

app.MapGet("/", () => "business flow micro started!");

app.Run();

Upvotes: 0

Views: 42

Answers (0)

Related Questions