Tarek
Tarek

Reputation: 43

How to configure MassTransit with Mediator to publish messages?

I'm new with MassTransit and Mediator, I have a series of events to execute in consecutive order, I'm using MassTransit in-process and in-memory, for my use case no transport is required.

I want to send and publish messages to consumers, sagas, activities through Mediator, I have the code below, but I want to improve it by registering MassTransit in startup.cs:

//asp net core 3.1 Controller

 [ApiController]
 public class MyController : ControllerBase
 {    
    private readonly IProductService _productService ;
    private readonly IMediator mediator;
    public MyController(IProductService productService)
    {
       _productService = productService;
      var repository = new InMemorySagaRepository<ApiSaga>();
      mediator = Bus.Factory.CreateMediator(cfg =>
      {
          cfg.Saga<ProductSaga>(repository);
      });
    }
   
     [HttpPost]
     public async Task<IActionResult> Post([FromBody] ProductContract productContract)
     {            
         try
         {
             var result = await _productService.DoSomeThingAsync(productContract);
             await mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = Guid.NewGuid(), result.Label });
         return Ok();
         }
         catch (Exception ex)
         {
             return BadRequest(ex.Message);
         }
      }
 }

//My saga
public class ProductSaga :
        ISaga,
        InitiatedBy<ProductSubmittedEvent>
    {
        public Guid CorrelationId { get; set; }
        public string State { get; private set; } = "Not Started";

        public Task Consume(ConsumeContext<ProductSubmittedEvent> context)
        {
            var label= context.Message.Label;
            State = "AwaitingForNextStep";
            //...
           //send next command
        }
    }

Like this it works but it's not proper, I want to configure masstransit with Mediator in my startup.cs to have one proper instance, to do that I started by deleting the IMediator, using an IPublishEndpoint to publish messages to Saga and configuring my startup.cs, but it doesn't work as expected:

//startup.cs

 public void ConfigureServices(IServiceCollection services)
 {
      services.AddMediator(cfg =>
            {
                cfg.AddSaga<ProductSaga>().InMemoryRepository();
            });
 }

//in controller using:
private readonly IPublishEndpoint _publishEndpoint;

//then
await _publishEndpoint.Publish<ProductSubmittedEvent>(
    new { CorrelationId = Guid.NewGuid(), result.Label });

I got a System.InvalidOperationException:

Unable to resolve service for type 'MassTransit.IPublishEndpoint' while attempting to activate 'GaaS.API.Controllers.ManageApiController'.

I tried to update my startup.cs:

var repository = new InMemorySagaRepository<ApiSaga>();
            services.AddMassTransit(cfg =>
            {
                cfg.AddBus(provider =>
                {
                    return Bus.Factory.CreateMediator(x =>
                    {
                        x.Saga<ProductSaga>(repository);
                    });
                });
            });

I got:

Cannot implicitly convert type 'MassTransit.Mediator.IMediator' to 'MassTransit.IBusControl'.

If you have any recommendation ideas thanks for sharing and challenging me 😊

Upvotes: 2

Views: 6255

Answers (2)

0909EM
0909EM

Reputation: 5027

I got to this from the (excellent) youtube video - MassTransit starting with Mediator, in that sample there is a line of code

AddMediator()

which I couldn't locate. I believe the following setup provides everything needed to get code working based on that video...

            services.AddMassTransit(config =>
            {
                config.AddRequestClient<ISubmitOrder>();
                config.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();

                config.UsingInMemory(ConfigureBus);
            });

and ConfigureBus is then:

        private void ConfigureBus(IBusRegistrationContext context, IInMemoryBusFactoryConfigurator configurator)
        {
            configurator.ConfigureEndpoints(context);
        }

I couldn't readily find this elsewhere, hence posting here.

Upvotes: 0

Chris Patterson
Chris Patterson

Reputation: 33268

The proper way to configure MassTransit Mediator in your project is through the Startup.cs file, which you seem to have tried.

public void ConfigureServices(IServiceCollection services)
{
    services.AddMediator(cfg =>
    {
        cfg.AddSaga<ProductSaga>().InMemoryRepository();
    });
}

Using mediator, you need to depend upon the IMediator interface. You cannot use IPublishEndpoint or ISendEndpointProvider, as those are bus interfaces. Since you can have both mediator and a bus instance in the container at the same time, this would lead to confusion when resolving services from the container.

[ApiController]
public class MyController : ControllerBase
{    
    private readonly IProductService _productService ;
    private readonly IMediator _mediator;

    public MyController(IProductService productService, IMediator mediator)
    {
        _productService = productService;
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] ProductContract productContract)
    {            
        try
        {
            var result = await _productService.DoSomeThingAsync(productContract);
            
            await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });

            return Ok();
        }
        catch (Exception ex)
        {
            return BadRequest(ex.Message);
        }
    }
}

If you are only using mediator, and want to use IPublishEndpoint, you could add that to the container yourself and delegate it.

services.AddSingleton<IPublishEndpoint>(provider => provider.GetService<IMediator>());

Upvotes: 4

Related Questions