Jamil
Jamil

Reputation: 940

c# - DbContext gets disposed in BackgroundService

I have a WebAPI that should also receive messages from RabbitMQ. I used this tutorial, because I know that sometimes IIS likes to kill long-running tasks (didn't test it on server yet though, maybe it won't work). I have a service that handles messages that are received via RabbitMQ. First problem I met - I couldn't inject it into BackgroundService class, so I used IServiceScopeFactory. Now, I have to consume messages from two queues, and as I understood, best practice is to use two channels for this. But the handling is done in one service. BackgroundService:

public class ConsumeRabbitMQHostedService : BackgroundService
{
    private IConnection _connection;
    private IModel _firstChannel;
    private IModel _secondChannel;
    private RabbitConfigSection _rabbitConfig;
    public IServiceScopeFactory _serviceScopeFactory;

    public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
    {
        _rabbitConfig = rabbitConfig.Value;
        _serviceScopeFactory = serviceScopeFactory;
        InitRabbitMQ();
    }

    private void InitRabbitMQ()
    {
        var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };

        
        _connection = factory.CreateConnection();

        
        _firstChannel = _connection.CreateModel();

        _firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
        _firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
        _firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
        _firstChannel.BasicQos(0, 1, false);

        _secondChannel = _connection.CreateModel();

        _secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
        _secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
        _secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
        _secondChannel.BasicQos(0, 1, false);

        _connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
    }
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();

        var firstConsumer = new EventingBasicConsumer(_firstChannel);
        var secondConsumer = new EventingBasicConsumer(_secondChannel);
        using (var scope = _serviceScopeFactory.CreateScope())
        {
            IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
            firstConsumer.Received += (ch, ea) =>
            {
                // received message  
                var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

                // handle the received message  
                HandleFirstMessage(content, scoped);
                _firstChannel.BasicAck(ea.DeliveryTag, false);

            };
            firstConsumer.Shutdown += OnConsumerShutdown;
            firstConsumer.Registered += OnConsumerRegistered;
            firstConsumer.Unregistered += OnConsumerUnregistered;
            firstConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
            _firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
        }
        using (var scope = _serviceScopeFactory.CreateScope())
        {
            IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
            secondConsumer.Received += (ch, ea) =>
            {
                // received message  

                var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

                // handle the received message  
                HandleSecondMessage(content, scoped);
                _secondChannel.BasicAck(ea.DeliveryTag, false);
            };


            secondConsumer.Shutdown += OnConsumerShutdown;
            secondConsumer.Registered += OnConsumerRegistered;
            secondConsumer.Unregistered += OnConsumerUnregistered;
            secondConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;

            _secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
        }
        return Task.CompletedTask;
    }

    private void HandleFirstMessage(string content, IIntegrationService integrationService)
    {
        List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
        integrationService.ImportFirst(dataToImport);
    }

    private void HandleSecondMessage(string content, IIntegrationService integrationService)
    {
        List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
        integrationService.ImportSecond(importData);
    }

    private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
    private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
    private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

    public override void Dispose()
    {
        _firstChannel.Close();
        _connection.Close();
        base.Dispose();
    }
}

In service I get

System.ObjectDisposedException: 'Cannot access a disposed context instance. A common cause of this error is disposing a context instance that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling 'Dispose' on the context instance, or wrapping it in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances. Object name: 'IntegrationDbContext'.'

DbContext is injected into IIntegrationService. If I understand what's happening, two instances of the service(or even one) share DbContext, and when one of them finishes it disposes DbContext. I tried not to create two instances (all code inside one using), tried making IIntegrationService transient, tried doing everything asynchronously (it was initial version, made it synchronous to test) - still same error. What should I do here? And is this the right approach?

Update 1. ConfigureServices in Startup:

        public void ConfigureServices(IServiceCollection services)
    {
        var rabbitConfigSection =
            Configuration.GetSection("Rabbit");
        services.Configure<RabbitConfigSection>(rabbitConfigSection);
        services.AddDbContext<SUNDbContext>(options =>
               options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));

        services.AddCors();
        services.AddSwaggerGen(c =>
        {
            c.SwaggerDoc("v1", new OpenApiInfo
            {
                Title = "My API",
                Version = "v1"
            });
        });
        services.AddRabbit(Configuration);
        services.AddHostedService<ConsumeRabbitMQHostedService>();
        services.AddControllers();
        services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
    }

Upvotes: 5

Views: 4684

Answers (1)

David L
David L

Reputation: 33853

The issue is caused by the fact that the outer scope created by _serviceScopeFactory.CreateScope() is disposed after each using statement, whereas each message is still trying to rely on the now disposed scope and attached context to process the message.

The solution is to create a new scope per message in your message handlers:

private void HandleFirstMessage(string content)
{
    using (var scope = _serviceScopeFactory.CreateScope())
    {
        IIntegrationService integrationService = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
        List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
        integrationService.ImportFirst(dataToImport);
    }
}

Upvotes: 6

Related Questions