Bigtoe
Bigtoe

Reputation: 3510

MassTransit Losing Messages - Rabbit MQ - When publisher and consumer endpoint names are the same,

We've encountered a situation where MassTransit is losing messages if you create a publisher and consumer using the same endpoint name.

Note the code below; if I use a different endpoint name for either the consumer or publisher (e.g. "rabbitmq://localhost/mtlossPublised" for the publisher) then the message counts both published and consumed match; if I use the same endpoint name (as in the sample) then I get less messages consumed than published.

Is this expected behaviour? or am I doing something wrong, working sample code below.

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            var publisherBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) });
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();
        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        string Message { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public string Message { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}

Upvotes: 4

Views: 3537

Answers (2)

Travis
Travis

Reputation: 10547

Bottom line, every instance of a bus needs it's own queue to read from. Even if the bus only exists to publish messages. This is just a requirement of how MassTransit works.

http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - see the warning.

We leave the behaviour as undefined when two bus instances share the same queue. Regardless, it's not a condition we support. Each bus instance may send meta data to other bus instances, and requires it's own endpoint. This was a much bigger deal with MSMQ, so maybe we could get this case to work on RabbitMQ - but it's not something we've spent much thought into at this point.

Upvotes: 4

Binary Worrier
Binary Worrier

Reputation: 51709

What's happening is that in giving the same Receiver Uri you're telling MT to load balance consumption on the two busses, however you've only one bus listening to the messages.

If you get it to keep track of which messages are received you'll see it's (nearly) every second one.

Having tweaked your sample code I get

We consumed 6 simple messages. Press Enter to terminate the applicaion.
Received 0
Received 3
Received 5
Received 6
Received 7
Received 8

Start a consumer on the other bus and you'll get them all

We consumed 10 simple messages. Press Enter to terminate the applicaion.
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9

So yes, I'd say this is expected behaviour.

Here's the tweaked sample code with two subscribers

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        internal static bool[] msgReceived = new bool[10];
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
                {
                    b.UseRabbitMq();
                    b.UseRabbitMqRouting();
                    b.ReceiveFrom("rabbitmq://localhost/mtloss");
                });
            var publisherBus = ServiceBusFactory.New(b =>
                {
                    b.UseRabbitMq();
                    b.UseRabbitMqRouting();
                    b.ReceiveFrom("rabbitmq://localhost/mtloss");
                });
            publisherBus.SubscribeConsumer(() => new MessageConsumer());
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                consumerBus.Publish(new SimpleMessage()
                    {CorrelationId = Guid.NewGuid(), MsgId = i});
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.",
                              MessageConsumer.Count);
            for (int i = 0; i < 10; i++)
                if (msgReceived[i])
                    Console.WriteLine("Received {0}", i);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();

        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        int MsgId { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public int MsgId { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            Program.msgReceived[message.MsgId] = true;
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}

Upvotes: 1

Related Questions