Reputation: 3760
I'm working on a .NET Core 2.2 Console Application that hosts an IHostedService
:
public class MqttClientHostedService : IHostedService, IDisposable
{
[...]
public MqttClientHostedService(
ILogger<MqttClientHostedService> logger,
IOptions<MqttClientConfiguration> mqttConfiguration,
IPositionService positionService)
{
this.logger = logger;
this.config = mqttConfiguration;
this.positionService = positionService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
mqttClient = new MqttFactory().CreateMqttClient();
mqttClient.Connected += async (s, e) => await MqttClient_Connected(s, e);
mqttClient.ApplicationMessageReceived +=
async (s, e) => await MqttClient_ApplicationMessageReceived(s, e);
await mqttClient.ConnectAsync(
new MqttClientOptionsBuilder()
.WithTcpServer(config.Value.Host, config.Value.Port).Build());
}
private async Task MqttClient_ApplicationMessageReceived(
object sender, MqttApplicationMessageReceivedEventArgs e)
{
string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
await positionService.HandleMessage(message);
}
[...]
}
This IPositionService
is a manager that inspects the message and checks if it can be saved inside our database:
public class PositionService : IPositionService
{
[...]
public PositionService(
IUnitOfWork unitOfWork, ILogger<PositionService> logger)
{
this.unitOfWork = unitOfWork;
this.logger = logger;
}
public async Task HandleMessage(string message)
{
Entity entity = await unitOfWork.EntityRepository.GetByMessage(message);
[...]
await unitOfWork.EntityRepository.UpdateAsync(entity);
await unitOfWork.Save();
}
[...]
}
IUnitOfWork
is a wrapper around Entity Framework Core DbContext
(please don't judge me, I have reasons to do this):
public class UnitOfWork : IUnitOfWork
{
[...]
public UnitOfWork(MyContext myContext)
{
this.myContext = myContext;
EntityRepository = new EFRepository<Entity>(myContext);
}
public async Task Save()
{
await myContext.SaveChangesAsync();
}
}
EFRepository<T>
, that implements IRepository<T>
interface, is a wrapper around DbSet<T>
(again, please don't judge me). No relevant code here.
Console Application's Program.cs is configured like that:
[...]
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<MyContext>(
c => c.UseSqlServer("[...]", options => options.UseNetTopologySuite()),
ServiceLifetime.Transient);
services.AddTransient<IPositionService, PositionService>();
services.AddTransient(typeof(IRepository<>), typeof(EFRepository<>));
services.AddTransient<IUnitOfWork, UnitOfWork>();
services.AddHostedService<MqttClientHostedService>();
[...]
});
Problem is that PositionService.HandleMessage
is being called many times per second, and being that DbContext
is not thread safe I get this error message:
A second operation started on this context before a previous operation completed.
I solved this issue by removing IUnitOfWork
from PositionService
's dependencies, injecting instead an IServiceScopeFactory
, and doing:
using (IServiceScope serviceScope = serviceScopeFactory.CreateScope())
{
IUnitOfWork unitOfWork = serviceScope.ServiceProvider.GetService<IUnitOfWork>();
[...]
}
This way works, but I don't like it. It seems like a trick, and I don't like the fact that my PositionService
knows about Dependency Injection
and has to deal with scopes.
My question is: there's a better way to solve this problem without touching my classes? Should I make the whole UnitOfWork
thread safe? Or maybe create it by hand without using DI?
Upvotes: 0
Views: 883
Reputation: 172826
The source of the problem is that MyContext
is held captive as a Captive Dependency in the following object graph:
MqttClientHostedService
-> PositionService
-> UnitOfWork
-> MyContext
All types in this graph are registered as Transient
, but still, services that act as hosted service (e.g. your MqttClientHostedService
) are resolved only once for the duration of the application and cached indefinately. This effectively makes them a singleton.
In other words, MyContext
is accidentally kept alive by the single MqttClientHostedService
and because multiple messages can come in in parallel, you have yourself a race condition.
The solution is to let each ApplicationMessageReceived
event run in its own unique little bubble (a scope) and resolve a new IPositionService
from within that bubble. For instance:
public class MqttClientHostedService : IHostedService, IDisposable
{
[...]
public MqttClientHostedService(
ILogger<MqttClientHostedService> logger,
IOptions<MqttClientConfiguration> mqttConfiguration,
IServiceProvider provider)
{
this.logger = logger;
this.config = mqttConfiguration;
this.provider = provider;
}
[...]
private async Task MqttClient_ApplicationMessageReceived(
object sender, MqttApplicationMessageReceivedEventArgs e)
{
using (var scope = provider.CreateScope())
{
positionService = scope.ServiceProvider
.GetRequiredService<IPositionService>();
string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
await positionService.HandleMessage(message);
}
}
[...]
}
Upvotes: 2