Alessandro
Alessandro

Reputation: 3760

UnitOfWork and DbContext: thread safety with DI

I'm working on a .NET Core 2.2 Console Application that hosts an IHostedService:

public class MqttClientHostedService : IHostedService, IDisposable
{
    [...]
    public MqttClientHostedService(
        ILogger<MqttClientHostedService> logger, 
        IOptions<MqttClientConfiguration> mqttConfiguration, 
        IPositionService positionService)
    {
        this.logger = logger;
        this.config = mqttConfiguration;
        this.positionService = positionService;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        mqttClient = new MqttFactory().CreateMqttClient();
        mqttClient.Connected += async (s, e) => await MqttClient_Connected(s, e);
        mqttClient.ApplicationMessageReceived +=
            async (s, e) => await MqttClient_ApplicationMessageReceived(s, e);

        await mqttClient.ConnectAsync(
            new MqttClientOptionsBuilder()
                .WithTcpServer(config.Value.Host, config.Value.Port).Build());
    }

    private async Task MqttClient_ApplicationMessageReceived(
        object sender, MqttApplicationMessageReceivedEventArgs e)
    {
        string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        await positionService.HandleMessage(message);
    }

    [...]
}

This IPositionService is a manager that inspects the message and checks if it can be saved inside our database:

public class PositionService : IPositionService
{
    [...]

    public PositionService(
        IUnitOfWork unitOfWork, ILogger<PositionService> logger)
    {
        this.unitOfWork = unitOfWork;
        this.logger = logger;
    }

    public async Task HandleMessage(string message)
    {
        Entity entity = await unitOfWork.EntityRepository.GetByMessage(message);

        [...]

        await unitOfWork.EntityRepository.UpdateAsync(entity);
        await unitOfWork.Save();
    }

    [...]
}

IUnitOfWork is a wrapper around Entity Framework Core DbContext (please don't judge me, I have reasons to do this):

public class UnitOfWork : IUnitOfWork
{
    [...]

    public UnitOfWork(MyContext myContext)
    {
        this.myContext = myContext;

        EntityRepository = new EFRepository<Entity>(myContext);
    }

    public async Task Save()
    {
        await myContext.SaveChangesAsync();
    }
}

EFRepository<T>, that implements IRepository<T> interface, is a wrapper around DbSet<T> (again, please don't judge me). No relevant code here.

Console Application's Program.cs is configured like that:

[...]
.ConfigureServices((hostContext, services) =>
{
    services.AddDbContext<MyContext>(
        c => c.UseSqlServer("[...]", options => options.UseNetTopologySuite()),
        ServiceLifetime.Transient);

    services.AddTransient<IPositionService, PositionService>();
    services.AddTransient(typeof(IRepository<>), typeof(EFRepository<>));
    services.AddTransient<IUnitOfWork, UnitOfWork>();

    services.AddHostedService<MqttClientHostedService>();

    [...]
});

Problem is that PositionService.HandleMessage is being called many times per second, and being that DbContext is not thread safe I get this error message:

A second operation started on this context before a previous operation completed.

I solved this issue by removing IUnitOfWork from PositionService's dependencies, injecting instead an IServiceScopeFactory, and doing:

using (IServiceScope serviceScope = serviceScopeFactory.CreateScope())
{
    IUnitOfWork unitOfWork = serviceScope.ServiceProvider.GetService<IUnitOfWork>();

    [...]
}

This way works, but I don't like it. It seems like a trick, and I don't like the fact that my PositionService knows about Dependency Injection and has to deal with scopes.

My question is: there's a better way to solve this problem without touching my classes? Should I make the whole UnitOfWork thread safe? Or maybe create it by hand without using DI?

Upvotes: 0

Views: 883

Answers (1)

Steven
Steven

Reputation: 172826

The source of the problem is that MyContext is held captive as a Captive Dependency in the following object graph:

MqttClientHostedService
    -> PositionService
        -> UnitOfWork
            -> MyContext 

All types in this graph are registered as Transient, but still, services that act as hosted service (e.g. your MqttClientHostedService) are resolved only once for the duration of the application and cached indefinately. This effectively makes them a singleton.

In other words, MyContext is accidentally kept alive by the single MqttClientHostedService and because multiple messages can come in in parallel, you have yourself a race condition.

The solution is to let each ApplicationMessageReceived event run in its own unique little bubble (a scope) and resolve a new IPositionService from within that bubble. For instance:

public class MqttClientHostedService : IHostedService, IDisposable
{
    [...]
    public MqttClientHostedService(
        ILogger<MqttClientHostedService> logger, 
        IOptions<MqttClientConfiguration> mqttConfiguration, 
        IServiceProvider provider)
    {
        this.logger = logger;
        this.config = mqttConfiguration;
        this.provider = provider;
    }
    [...]
    private async Task MqttClient_ApplicationMessageReceived(
        object sender, MqttApplicationMessageReceivedEventArgs e)
    {
        using (var scope = provider.CreateScope())
        {
            positionService = scope.ServiceProvider
                .GetRequiredService<IPositionService>();
            string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            await positionService.HandleMessage(message);
        }
    }

    [...]
}

Upvotes: 2

Related Questions