Reputation: 2255
I have been googling this for a while and can't seem to find a solution, this seems to line up with the Mass Transit docs. Mass transit works perfectly in my project but as soon as I add the SQL server outbox it doesn't write anything to the tables and no publishing takes place. Here is how I register Mass Transit :
_services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumersFromNamespaceContaining<ISomeApplicationMarker>();
busConfigurator.AddEntityFrameworkOutbox<SomeDbContext>(options =>
{
options.UseSqlServer();
options.UseBusOutbox();
});
busConfigurator.AddConfigureEndpointsCallback((IRegistrationContext context, string name, IReceiveEndpointConfigurator cfg) =>
{
cfg.UseEntityFrameworkOutbox<SomeDbContext>(context);
});
busConfigurator.UsingRabbitMq((context, configurator) =>
{
var settings = context.GetRequiredService<MessageBrokerSettings>();
configurator.Host(new Uri(settings.Host), h =>
{
h.Username(settings.Username);
h.Password(settings.Password);
});
configurator.ConfigureEndpoints(context);
});
});
I also have this in the Db Context's OnModelCreating :
modelBuilder.AddTransactionalOutboxEntities();
The way all my publishing takes place is through an EF Core interceptor that captures domain event emitted by the model :
public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, CancellationToken cancellationToken = default)
{
var bus = _serviceScope.ServiceProvider.GetRequiredService<IEventBus>();
var context = eventData.Context;
if (context is null)
return await base.SavedChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
foreach(var domainEvent in events)
await bus.PublishAsync(domainEvent, cancellationToken);
return await base.SavedChangesAsync(eventData, result, cancellationToken);
}
Lastly, my IEventBus implementation looks like this :
public async Task PublishAsync(object message, CancellationToken cancellationToken = default)
{
_logger.LogDebug("Publishing event {@Event}", message);
await _endpoint.Publish(message, (context) =>
{
context.Headers.Set("jwtToken", _contextAccessor.GetJwtTokenFromAuthHeader());
}, cancellationToken);
}
The _endpoint is an injected IPublishEndpoint in that class.
Again, everything works perfectly without the outbox. What am I doing wrong?
Upvotes: 0
Views: 169