Reputation: 43
I have a series of event coordinated by a Saga
From the controller I persist data in db, then I publish an event to my saga:
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator _mediator;
public MyController(IProductService productService, IMediator mediator)
{
_productService = productService;
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
The Saga consume the first event then send new command:
public class ProductSaga:
ISaga,
InitiatedBy<ProductSubmittedEvent>,
Orchestrates<AcecptedFilterEvent>,
Orchestrates<RefusedFilterEvent>
{
public Guid CorrelationId { get; set; }
public string State { get; private set; } = "Not Started";
public readonly Uri filterEndpoint = new Uri(
$"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(FilterCommand))}");
public async Task Consume(ConsumeContext<ApiSubmittedEvent> context)
{
//Send new command to filter step
var sendEndpoint = await context.GetSendEndpoint(filterEndpoint);
await sendEndpoint.Send<FilterCommand>(new { CorrelationId, context.Message.Label});
}
// I have two other events to consume according to command result
public Task Consume(ConsumeContext<AcecptedFilterEvent> context)
{
//if OK send new command to next step
//...
return Task.CompletedTask;
}
public Task Consume(ConsumeContext<RefusedFilterEvent> context)
{
//if FilterCommand refused do it again
//...
return Task.CompletedTask;
}
}
So here my consumer code for FilterCommand
:
public class FilterCommandConsumer : IConsumer<FilterCommand>
{
private readonly ILogger<FilterCommand> _logger;
public FilterCommandConsumer (ILogger<FilterCommand> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<FilterCommand> context)
{
_logger?.LogInformation($"Consuming FilterCommand- {context.Message.CorrelationId}");
try
{
//call a service to handle the filtering
//...
//here the problem: when publish thread exit and app still run to infinity
await context.Publish<AcecptedFilterEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "AcecptedFilter"
});
}
catch (Exception ex)
{
await context.Publish<RefusedFilterEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "RefusedFilter",
Error = ex.Message
});
throw;
}
}
}
When consuming my command, I want to publish new events AcecptedFilterEvent
& RefusedFilterEvent
, there I can not do the publish:
I want to publish an event and consume it on Saga
to start next steps.
I tried to inject IMediator
on my consumer class to publish messages with _mediator.Publish()
but I got the same weird behavior.
Here is my startup.cs
config:
//configure MassTransit
services.AddMediator(cfg =>
{
cfg.AddConsumersFromNamespaceContaining<FilterCommandConsumer>();
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
If you have any recommendation ideas thanks for sharing and challenging me 😊
Upvotes: 0
Views: 304
Reputation: 33540
Because you are using mediator, which uses the caller's async call context to deliver messages to consumers, you're basically creating a really nasty call stack and are deadlocking on the saga instance.
(controller) => (saga) => (consumer) => (saga, oops, it's locked by earlier saga)
Upvotes: 0