Amour Rashid
Amour Rashid

Reputation: 310

Using RabbitMQ in ASP.net core IHostedService

I am facing a problem that would like help. I am developing a background process that will be listening to a queue in rabbitmq server. It is OK if I run it in a .net core console application. However I would like to do it in a more elegant way such as web service (which has given me a lot of trouble where it does not work when installed) or an IIS hosted web application. I face a problem of Scoped Service when I try to host the service (IHostedService) in .net core web application.

The code below is working fine in a console application. How can make it run as an IHostedService in a .net core web application. What am I supposed to change. Your help is appreciated. CODE:

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using PaymentProcessor.Models;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.EntityFrameworkCore;

namespace PaymentProcessor
{
    public class PaymentProcessingService : HostedService
    {
        IConfiguration configuration;

        private EntitiesContext claimsContext;
        private string connectionString;

        private string HostName = "";
        private string UserName = "";
        private string Password = "";

        private static int MaxRetries;

        private IConnectionFactory factory;
        private IConnection connection;
        private IModel channel;

        public PaymentProcessingService(IConfiguration configuration)
        {
            this.configuration = configuration;

            this.connectionString = configuration.GetConnectionString ("StagingContext");
            claimsContext = new EntitiesContext(connectionString);

            HostName = this.configuration.GetValue<string>("Settings:HostName");
            UserName = this.configuration.GetValue<string>("Settings:UserName");
            Password = this.configuration.GetValue<string>("Settings:Password");
            MaxRetries = this.configuration.GetValue<string>("Settings:MaxRetries").ConvertTo<int>(); 
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {

            connect:
            factory = new ConnectionFactory { HostName = HostName, UserName = UserName, Password = Password };

            try
            {
                connection = factory.CreateConnection();
                channel = connection.CreateModel();

                channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
                channel.QueueDeclare("payment_processing_queue", true, false, false, null);
                channel.QueueBind("payment_processing_queue", "payment_processing_exchange", "processing");


                var queueArgs = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "payment_processing_exchange" },
                        {"x-dead-letter-routing-key", "processing_retry"},
                        { "x-message-ttl", 10000 }
                    };

                channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
                channel.QueueDeclare("payment_processing_retry_queue", true, false, false, queueArgs);
                channel.QueueBind("payment_processing_retry_queue", "payment_processing_exchange", "processing_retry", null);

                channel.ExchangeDeclare("payment_processing_exchange", "topic");
                channel.QueueDeclare("payment_processing_error_queue", true, false, false, null);
                channel.QueueBind("payment_processing_error_queue", "payment_processing_exchange", "processing_error", null);

                channel.ExchangeDeclare("payment_processing_exchange", "topic");
                channel.QueueDeclare("payment_integration_queue", true, false, false, null);
                channel.QueueBind("payment_integration_queue", "payment_processing_exchange", "integration", null);

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {


                    var message = ea.Body.DeSerializeText();
                    try
                    {
                        var saveBundle = JObject.Parse(message);
                        var msg = (dynamic)((dynamic)saveBundle).Message;
                        string referenceNo = (string)msg.ReferenceNo;

                        var parameters = new[]
                        {
                           new SqlParameter
                            {
                                DbType =  DbType.String,
                                ParameterName = "ReferenceNo",
                                Value =referenceNo
                            }
                        };

                        var result = claimsContext.Database.ExecuteSqlCommand("dbo.PaymentReferencesProcessSingle @ReferenceNo", parameters);

                        IBasicProperties props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        props.ContentType = "text/plain";
                        props.DeliveryMode = 2;

                        channel.BasicPublish("payment_processing_exchange", "integration", props, (new MessageEnvelope { RetryCounts = 0, Message = JObject.FromObject(new { ReferenceNo = referenceNo }) }).Serialize()); 



                    }
                    catch (Exception ex)
                    {

                        MessageEnvelope envelope = JsonConvert.DeserializeObject<MessageEnvelope>(message);

                        if (envelope.RetryCounts < MaxRetries)
                        {
                            int RetryCounts = envelope.RetryCounts + 1;
                            MessageEnvelope messageEnvelope = new MessageEnvelope { RetryCounts = RetryCounts, Message = envelope.Message };
                            var data = messageEnvelope.Serialize();
                            channel.BasicPublish("payment_processing_exchange", "processing_retry", null, data);

                        }
                        else
                        {
                            var data = envelope.Serialize();
                            channel.BasicPublish("payment_processing_exchange", "processing_error", null, data);
                        }

                    }
                    finally
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }

                };

                channel.BasicConsume(queue: "payment_processing_queue", autoAck: false, consumer: consumer);

            }
            catch (Exception ex)
            {

                Thread.Sleep(10000);
                goto connect;
            }

        }
    }
}

and then

services.AddScoped<IHostedService, PaymentProcessingService>();

Upvotes: 0

Views: 8442

Answers (2)

user2809176
user2809176

Reputation: 1275

As Dekim mentioned a service should be registered.

Please have a look on an example I created on GitHub.

Program.cs looks like this:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Tasks;

namespace Core
{
    internal class Program
    {
        static async Task Main(string[] args)
        {

            await new HostBuilder()
                 .ConfigureServices((hostContext, services) =>
                 {
                     services.AddHostedService<ServiceRabbitMQ>(); // register our service here            
                 })
                .RunConsoleAsync();
        }
    }
}

Upvotes: 3

Because the IHostedService requires the creation of a special scope according to the documentation.

From the Microsoft documentation cited in the above answer :

No scope is created for a hosted service by default.

Consider using : services.AddHostedService<MyHostedService>();

Upvotes: 1

Related Questions