Martin
Martin

Reputation: 2255

Mass Transit : EF Core outbox not persisting messages

I have been googling this for a while and can't seem to find a solution, this seems to line up with the Mass Transit docs. Mass transit works perfectly in my project but as soon as I add the SQL server outbox it doesn't write anything to the tables and no publishing takes place. Here is how I register Mass Transit :

_services.AddMassTransit(busConfigurator =>
{
    busConfigurator.SetKebabCaseEndpointNameFormatter();
    busConfigurator.AddConsumersFromNamespaceContaining<ISomeApplicationMarker>();

    busConfigurator.AddEntityFrameworkOutbox<SomeDbContext>(options =>
    {
        options.UseSqlServer();
        options.UseBusOutbox();
    });

    busConfigurator.AddConfigureEndpointsCallback((IRegistrationContext context, string name, IReceiveEndpointConfigurator cfg) =>
    {
        cfg.UseEntityFrameworkOutbox<SomeDbContext>(context);
    });

    busConfigurator.UsingRabbitMq((context, configurator) =>
    {
        var settings = context.GetRequiredService<MessageBrokerSettings>();

        configurator.Host(new Uri(settings.Host), h =>
        {
            h.Username(settings.Username);
            h.Password(settings.Password);
        });

        configurator.ConfigureEndpoints(context);
    });
});

I also have this in the Db Context's OnModelCreating :

modelBuilder.AddTransactionalOutboxEntities();

The way all my publishing takes place is through an EF Core interceptor that captures domain event emitted by the model :

public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, CancellationToken cancellationToken = default)
{
    var bus = _serviceScope.ServiceProvider.GetRequiredService<IEventBus>();

    var context = eventData.Context;

    if (context is null)
        return await base.SavedChangesAsync(eventData, result, cancellationToken);

    var events = context.ChangeTracker
        .Entries<AggregateRoot>()
        .Select(a => a.Entity)
        .Where(e => e.GetDomainEvents() is not null)
        .SelectMany(e =>
        {
            var domainEvents = e.GetDomainEvents();

            e.ClearDomainEvents();

            return domainEvents;
        });

    foreach(var domainEvent in events)
        await bus.PublishAsync(domainEvent, cancellationToken);

    return await base.SavedChangesAsync(eventData, result, cancellationToken);
}

Lastly, my IEventBus implementation looks like this :

public async Task PublishAsync(object message, CancellationToken cancellationToken = default)
{
    _logger.LogDebug("Publishing event {@Event}", message);

    await _endpoint.Publish(message, (context) =>
    {
        context.Headers.Set("jwtToken", _contextAccessor.GetJwtTokenFromAuthHeader());
    }, cancellationToken);
}

The _endpoint is an injected IPublishEndpoint in that class.

Again, everything works perfectly without the outbox. What am I doing wrong?

Upvotes: 0

Views: 169

Answers (0)

Related Questions