Fatih ARI
Fatih ARI

Reputation: 15

MassTransit Saga State Machine - Timeout caught while waiting for response from State Machine

In the Order service, after sending an event to the State Machine in the CreateOrderCommand Handle method in the OrderCommandHandler, the State Machine sends another event to the Payment Service. Payment Service listens for this event and publishes an event containing CreatePaymentSessionUrl to State Machine. After these stages are completed, State Machine sends this CreatePaymentSessionUrl data to the Order Service with RespondAsync. However, we cannot receive this CreatePaymentSessionUrl data with GetResponse in the CreateOrderCommand Handle, and we get a timout error. I think this is because I work asynchronously.

I reviewed the article this, but mine seemed like a slightly different topic.

My event is triggered in the Order Service, I am sharing the necessary codes:

//Order Microservice
private readonly IPublishEndpoint _publishEndpoint;
private readonly IRequestClient<GetPaymentSessionUrlRequest> _paymentClient;

//And added in constructor

public async Task<BaseResponseModel> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
 {

     var orderCreatedRequestEvent = new OrderCreatedRequestEvent
        {
            OrderId = orderRes.Id,
            TotalPrice = orderRes.GrandTotalCost,
        };


        //Step1:In this step, it is published to Saga State Machine and the information is stored in the Saga instance.
        await _publishEndpoint.Publish<IOrderCreatedRequestEvent>(orderCreatedRequestEvent, cancellationToken);

        //Step2: If the OrderCreated Event is run with the IOrderCreatedRequestEvent in Step1, the State machine publishes the event to the Payment service.
        //PaymentService consumes and publishes PaymentSessionCreatedEvent to the state machine.
        //The relevant code in Step2 runs in the payment service.

        //Step3:If the PaymentSessionCreatedEvent in Step2 is completed, the code here should work. Response data must be returned.
        //I think after the publish code in Step1 runs, this code runs and timeout is taken.
        var (foundStatus, notFoundStatus) =
            await _paymentClient.GetResponse<PaymentSessionUrlResponse, PaymentSessionUrlNotFoundResponse>(
                new GetOrderStateRequestMessage { OrderId = orderRes.Id }, cancellationToken,
                TimeSpan.FromSeconds(10));
        if (foundStatus.IsCompletedSuccessfully)
        {
            var urlResponse = await foundStatus;
            return new BaseResponseModel
                { Data = new CreatePaymentSessionUrlResponse { Url = urlResponse.Message.CreatePaymentSessionUrl } };
        }
        else
        {
            var urlResponse = await notFoundStatus;
            return new BaseResponseModel
            {
                StatusCode = HttpStatusCode.BadRequest,
                ErrorDescription = "There is a problem and this problem will be shown generically later."
            };
        }
}

And this is my StateMachine code:

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    private readonly ILogger<OrderStateMachine> _logger;
    public Event<IOrderCreatedRequestEvent> OrderCreatedRequestEvent { get; private set; }
    public Event<IPaymentSessionCreatedEvent> PaymentSessionCreatedEvent { get; private set; }
    public Event<GetPaymentSessionUrlRequest> PaymentSessionUrlRequestEvent { get; private set; }
    public State OrderCreated { get; private set; }
    public State PaymentSessionCreated { get; private set; }

    public OrderStateMachine(ILogger<OrderStateMachine> logger)
    {
        _logger = logger;

        InstanceState(x => x.CurrentState);

        Event(() => OrderCreatedRequestEvent,
            x =>
            {
                x.CorrelateBy((instance, context) => instance.OrderId == context.Message.OrderId);
                x.SelectId(context => Guid.NewGuid());
            });

        Event(() => PaymentSessionCreatedEvent, y => y.CorrelateById(x => x.OrderId, z => z.Message.CorrelationId));
        Event(() => PaymentSessionUrlRequestEvent, x =>
        {
            x.CorrelateById(instance => instance.OrderId, z => z.Message.OrderId);
            x.OnMissingInstance(instance => instance.ExecuteAsync(async context =>
            {
                if (context.RequestId.HasValue)
                {
                    await context.RespondAsync<PaymentSessionUrlNotFoundResponse>(new { context.Message.OrderId });
                }
            }));
        });
        
        Initially(
            When(OrderCreatedRequestEvent)
                .Then(context =>
                {
                    context.Saga.OrderId = context.Message.OrderId;
                    context.Saga.CreatedDate = DateTime.Now;
                    context.Saga.TotalPrice = context.Message.TotalPrice;
                })
                //Payment will consume the service. Step2 starts here.
                .Publish(context => new OrderCreatedEvent(context.Saga.OrderId)
                {
                    OrganizationId = context.Message.OrganizationId,
                    CartId = context.Message.CartId,
                    TotalPrice = context.Message.TotalPrice,
                    CurrencyCode = context.Message.CurrencyCode,
                    OrderLines = context.Message.OrderLines
                })
               .TransitionTo(OrderCreated)
        );
        
        //Step2 ends here.
        During(OrderCreated,
            When(PaymentSessionCreatedEvent)
                .Then(context =>
                {
                    context.Saga.CreatePaymentSessionUrl = context.Message.CreatePaymentSessionResult.Url;
                    _logger.LogInformation("PaymentSessionCreatedEvent after: {PaymentSessionCreated}", context.Saga);
                })
            .TransitionTo(PaymentSessionCreated)
        );
        
        //I was hoping Step3 would work here.
        //Both During and DuringAny give Timeout errors.
        During(PaymentSessionCreated,
            When(PaymentSessionUrlRequestEvent)
                .Then(context =>
                {
                    _logger.LogInformation("PaymentSessionUrlEvent received. Congratulations, your main problem is solved!");
                })
                .RespondAsync(context => context.Init<PaymentSessionUrlResponse>(new
                {
                    context.Saga.CreatePaymentSessionUrl
                }))
        );
    }
}

To resolve the issue, I have occasionally been able to get successful results by placing await Task.Delay(3000,cancellationToken) before the await GetResponse request in the Order Service or using retry policies while loops. However, I think this is not the best practice. It is difficult to optimize these times when there is a bottleneck.

Here, how can I make all events run in order and this CreatePaymentSessionUrl data is returned as a result of the service to which I first requested?

Upvotes: 0

Views: 171

Answers (0)

Related Questions