Tarek
Tarek

Reputation: 43

Missing configuration to well publish event from MassTransit consumer to MassTransit Saga (using MassTransit with Mediator)

I have a series of event coordinated by a Saga

From the controller I persist data in db, then I publish an event to my saga:

[ApiController]
public class MyController : ControllerBase
{    
    private readonly IProductService _productService ;
    private readonly IMediator _mediator;

    public MyController(IProductService productService, IMediator mediator)
    {
        _productService = productService;
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] ProductContract productContract)
    {            
        try
        {
            var result = await _productService.DoSomeThingAsync(productContract);            
            await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });

            return Ok();
        }
        catch (Exception ex)
        {
            return BadRequest(ex.Message);
        }
    }
}

The Saga consume the first event then send new command:


public class ProductSaga:
    ISaga,
    InitiatedBy<ProductSubmittedEvent>,
    Orchestrates<AcecptedFilterEvent>,
    Orchestrates<RefusedFilterEvent>
{
    public Guid CorrelationId { get; set; }
    public string State { get; private set; } = "Not Started";

    public readonly Uri filterEndpoint = new Uri(
        $"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(FilterCommand))}");

    public async Task Consume(ConsumeContext<ApiSubmittedEvent> context)
    {
        //Send new command to filter step
        var sendEndpoint = await context.GetSendEndpoint(filterEndpoint);
        await sendEndpoint.Send<FilterCommand>(new { CorrelationId, context.Message.Label});
    }

    // I have two other events to consume according to command result    
    public Task Consume(ConsumeContext<AcecptedFilterEvent> context)
    {   
        //if OK send new command to next step
        //...
        return Task.CompletedTask;
    }
    
    public Task Consume(ConsumeContext<RefusedFilterEvent> context)
    {
        //if FilterCommand refused do it again
        //... 
        return Task.CompletedTask;
    }
}

So here my consumer code for FilterCommand:


public class FilterCommandConsumer : IConsumer<FilterCommand>
{
    private readonly ILogger<FilterCommand> _logger;

    public FilterCommandConsumer (ILogger<FilterCommand> logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<FilterCommand> context)
    {
        _logger?.LogInformation($"Consuming FilterCommand- {context.Message.CorrelationId}");
       
        try
        {
            //call a service to handle the filtering 
            //...
            
            //here the problem: when publish thread exit and app still run to infinity
            await context.Publish<AcecptedFilterEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "AcecptedFilter"
            });
        }
        catch (Exception ex)
        {
            await context.Publish<RefusedFilterEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "RefusedFilter",
                Error = ex.Message
            });
            throw;
        }
    }
}

The problem:

When consuming my command, I want to publish new events AcecptedFilterEvent & RefusedFilterEvent, there I can not do the publish:

I want to publish an event and consume it on Saga to start next steps.

I tried to inject IMediator on my consumer class to publish messages with _mediator.Publish() but I got the same weird behavior.

Here is my startup.cs config:


        //configure MassTransit
        services.AddMediator(cfg =>
        {
            cfg.AddConsumersFromNamespaceContaining<FilterCommandConsumer>();                
            cfg.AddSaga<ProductSaga>().InMemoryRepository();
        });

If you have any recommendation ideas thanks for sharing and challenging me 😊

Upvotes: 0

Views: 304

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33540

Because you are using mediator, which uses the caller's async call context to deliver messages to consumers, you're basically creating a really nasty call stack and are deadlocking on the saga instance.

(controller) => (saga) => (consumer) => (saga, oops, it's locked by earlier saga)

Upvotes: 0

Related Questions