sandy
sandy

Reputation: 163

RabbitMQ consumer as a windows service

I have a rabbitmq consumer application implementing "publish/subscribe pattern in .net, which runs perfectly as a console application but when I deploy that as a windows service it does not seem to be saving the data into mongodb.

    protected override void OnStart(string[] args)
    {
        try
        {
             var connectionString = "mongodb://localhost";
            var client = new MongoClient(connectionString);
            var factory = new ConnectionFactory() { HostName = "localhost" };            
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "test", type: "fanout");
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName,                                       exchange: "logs", routingKey: "");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        BsonDocument document = BsonDocument.Parse(message);
                        var database = client.GetDatabase("test");
                        var collection = database.GetCollection<BsonDocument>("test_collection");
                        collection.InsertOneAsync(document);
                    };
                    channel.BasicConsume(queue: queueName,                                       noAck: true,consumer: consumer);

                }
            }
        }
        catch (Exception ex)
        {
            throw;
        }
    }

Is there something that I'm missing?

Upvotes: 4

Views: 13448

Answers (4)

Soroush Karimi
Soroush Karimi

Reputation: 413

the answer below helps me to fix this problem. as mentioned above you shouldn't use using statement which doesn't return, in the OnStart method. so you can get a single message in the OnStart method but you cant declare consumer by help the using statement.

the solution which fixes the problem for me

I hope this helps you too

Upvotes: 0

Today we need made RabbitMQ consumer as a windows service and solve with Timer in method OnStart.

private Timer _timer;

protected override void OnStart(string[] args)

{
     _timer = new Timer();
     _timer.Interval = 5000; 
     _timer.Elapsed += new ElapsedEventHandler(this.OnTimer);
     _timer.Start();
}

public void OnTimer(object sender, System.Timers.ElapsedEventArgs args)
{
     _timer.Enabled = false;

     ...
}

Many thanks for the help and hope to have helped with this solution too

Upvotes: 2

Svet Angelov
Svet Angelov

Reputation: 799

It's a bad idea to have a busy wait in OnStart() because the operating system will be expecting a return from it. Read here: https://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx

Edit: The problem with the code above is that you have your connection and channel in the using statements. The whole point of doing that is to Dispose them once out of scope. So in this case even though you're adding a event handler, you're shortly after exiting out of scope and disposing of the channel, etc. To fix this, pull the connection, channel, and consumer out of the `OnStart' method and make them class (probably private) members. That should keep them open even when you exit the method and your event should keep listening.

Upvotes: 15

sandy
sandy

Reputation: 163

Making the following changes to my Onstart method did the trick

    protected override void OnStart(string[] args)
    {

        ConnectionFactory factory = new ConnectionFactory { HostName = localhost" };
        var connectionString = "mongodb://localhost";
        var client = new MongoClient(connectionString);


        using (IConnection connection = factory.CreateConnection())
        {
            using (IModel channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "test", type: "fanout");

                string queueName = channel.QueueDeclare();

                channel.QueueBind(queueName, "test", "");

                this.EventLog.WriteEntry("Waiting for messages");

                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                while (true)
                {
                    BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var message = Encoding.UTF8.GetString(e.Body);
                    BsonDocument document = BsonDocument.Parse(message);
                    var database = client.GetDatabase("test");
                    var collection = database.GetCollection<BsonDocument>("test_collection");
                    collection.InsertOneAsync(document);

                }
            }
        }
    }

Upvotes: -2

Related Questions