Reputation: 43
I'm new with MassTransit and Mediator, I have a series of events to execute in consecutive order, I'm using MassTransit in-process and in-memory, for my use case no transport is required.
I want to send and publish messages to consumers, sagas, activities through Mediator, I have the code below, but I want to improve it by registering MassTransit in startup.cs
:
//asp net core 3.1 Controller
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator mediator;
public MyController(IProductService productService)
{
_productService = productService;
var repository = new InMemorySagaRepository<ApiSaga>();
mediator = Bus.Factory.CreateMediator(cfg =>
{
cfg.Saga<ProductSaga>(repository);
});
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = Guid.NewGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
//My saga
public class ProductSaga :
ISaga,
InitiatedBy<ProductSubmittedEvent>
{
public Guid CorrelationId { get; set; }
public string State { get; private set; } = "Not Started";
public Task Consume(ConsumeContext<ProductSubmittedEvent> context)
{
var label= context.Message.Label;
State = "AwaitingForNextStep";
//...
//send next command
}
}
Like this it works but it's not proper, I want to configure masstransit with Mediator in my startup.cs
to have one proper instance, to do that I started by deleting the IMediator
, using an IPublishEndpoint
to publish messages to Saga
and configuring my startup.cs
, but it doesn't work as expected:
//startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddMediator(cfg =>
{
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
}
//in controller using:
private readonly IPublishEndpoint _publishEndpoint;
//then
await _publishEndpoint.Publish<ProductSubmittedEvent>(
new { CorrelationId = Guid.NewGuid(), result.Label });
I got a System.InvalidOperationException
:
Unable to resolve service for type 'MassTransit.IPublishEndpoint' while attempting to activate 'GaaS.API.Controllers.ManageApiController'.
I tried to update my startup.cs
:
var repository = new InMemorySagaRepository<ApiSaga>();
services.AddMassTransit(cfg =>
{
cfg.AddBus(provider =>
{
return Bus.Factory.CreateMediator(x =>
{
x.Saga<ProductSaga>(repository);
});
});
});
I got:
Cannot implicitly convert type 'MassTransit.Mediator.IMediator' to 'MassTransit.IBusControl'.
If you have any recommendation ideas thanks for sharing and challenging me 😊
Upvotes: 2
Views: 6255
Reputation: 5027
I got to this from the (excellent) youtube video - MassTransit starting with Mediator, in that sample there is a line of code
AddMediator()
which I couldn't locate. I believe the following setup provides everything needed to get code working based on that video...
services.AddMassTransit(config =>
{
config.AddRequestClient<ISubmitOrder>();
config.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();
config.UsingInMemory(ConfigureBus);
});
and ConfigureBus
is then:
private void ConfigureBus(IBusRegistrationContext context, IInMemoryBusFactoryConfigurator configurator)
{
configurator.ConfigureEndpoints(context);
}
I couldn't readily find this elsewhere, hence posting here.
Upvotes: 0
Reputation: 33268
The proper way to configure MassTransit Mediator in your project is through the Startup.cs
file, which you seem to have tried.
public void ConfigureServices(IServiceCollection services)
{
services.AddMediator(cfg =>
{
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
}
Using mediator, you need to depend upon the IMediator
interface. You cannot use IPublishEndpoint
or ISendEndpointProvider
, as those are bus interfaces. Since you can have both mediator and a bus instance in the container at the same time, this would lead to confusion when resolving services from the container.
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator _mediator;
public MyController(IProductService productService, IMediator mediator)
{
_productService = productService;
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
If you are only using mediator, and want to use IPublishEndpoint
, you could add that to the container yourself and delegate it.
services.AddSingleton<IPublishEndpoint>(provider => provider.GetService<IMediator>());
Upvotes: 4