Reputation: 5749
I'm working on a proof of concept of topic-based routing in RabbitMQ in C#/.NET (using the EasyNetQ library). For my test, I have a single exchange bound to two durable queues by topic routing ("TopicA" and "TopicB").
Here is the code for the producer (a C# console application):
using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
Random random = new Random();
Foo foo; // my test message class
for (int i = 0; i < 100; i++)
{
int coin = random.Next(0, 2);
if (coin == 0)
{
foo = new Foo() { Payload = "Heads" };
bus.Publish(foo, "TopicA");
Console.WriteLine($"Published message {i} to TopicA.");
}
else
{
foo = new Foo() { Payload = "Tails" };
bus.Publish(foo, "TopicB");
Console.WriteLine($"Published message {i} to TopicB.");
}
}
}
Here is my consumer code (also a C# console application):
class Program
{
static void Main(string[] args)
{
TestRabbitMQSubscribe();
Console.ReadKey(false);
}
private static void TestRabbitMQSubscribe()
{
using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
bus.Subscribe<Foo>("TopicA", HandleFooA, config => config.WithTopic("TopicA"));
bus.Subscribe<Foo>("TopicB", HandleFooB, config => config.WithTopic("TopicB"));
}
}
private static void HandleFooA(Foo foo)
{
Console.WriteLine($"Received {foo.Payload} from TopicA.");
File.AppendAllText(@"c:\heads.txt", foo.Payload + Environment.NewLine);
}
private static void HandleFooB(Foo foo)
{
Console.WriteLine($"Received {foo.Payload} from TopicB.");
File.AppendAllText(@"c:\tails.txt", foo.Payload + Environment.NewLine);
}
}
The producer code runs with no issues, and I can verify afterward (using the RabbitMQ admin UI) that the TopicA queue contains (for example) 47 messages while the TopicB queue contains 53, as expected.
I then run the consumer code and it appears to pull in all of the messages in the TopicA queue and write them to the appropriate file. However, it will then either only receive a handful of the messages in the TopicB queue or none at all. It will then stop at the Console.ReadKey()
call.
If I reverse the order of the bus.Subscribe()
calls, it will pull in the messages from TopicB, but not TopicA.
I feel like I must be missing something simple (e.g. a blocking call) or I'm fundamentally misunderstanding some RabbitMQ or EasyNetQ concept.
Upvotes: 1
Views: 3365
Reputation: 2363
This seems to be working with just a few tweaks to your code, mostly around where to put a ReadLine() to prevent Producer exit. In a real implementation there would be some other mechanism keeping a producer active ensuring queue durability.
I think the main idea is that the Producer publish connection shouldn't terminate before the Consumer subscriptions have read all from the queues.
You can set both Producer and Consumer as startup projects in the solution and run them simultaneously, or the Producer can be run (but don't hit Enter) then sometime later the Consumer can be run. As long as the Producer using
block doesn't go out of scope, all is good and durable.
Posting code in its entirety just to make sure:
Producer:
using System;
using EasyNetQ;
using Messages;
namespace Producer
{
class Program
{
static void Main(string[] args)
{
using (var bus = RabbitHutch.CreateBus("host=zzz;username=zzz;password=zzz"))
{
var random = new Random();
for (var i = 1; i <= 100; ++i)
{
var coin = random.Next(0, 2);
if (coin == 0)
{
bus.Publish(new CoinFlipMessage { Payload = "Heads" }, CoinFlipMessage.HeadsTopic);
Console.WriteLine($"Published message {i} to {CoinFlipMessage.HeadsTopic}");
}
else
{
bus.Publish(new CoinFlipMessage { Payload = "Tails" }, CoinFlipMessage.TailsTopic);
Console.WriteLine($"Published message {i} to {CoinFlipMessage.TailsTopic}.");
}
}
Console.ReadLine();
}
}
}
}
Consumer:
using System;
using System.IO;
using EasyNetQ;
using Messages;
namespace Consumer
{
class Program
{
static void Main(string[] args)
{
using (var bus = RabbitHutch.CreateBus("host=zzz;username=zzz;password=zzz"))
{
bus.Subscribe<CoinFlipMessage>(CoinFlipMessage.HeadsTopic, HandleHeads, config => config.WithTopic(CoinFlipMessage.HeadsTopic));
bus.Subscribe<CoinFlipMessage>(CoinFlipMessage.TailsTopic, HandleTails, config => config.WithTopic(CoinFlipMessage.TailsTopic));
Console.ReadLine();
}
}
private static void HandleHeads(CoinFlipMessage message)
{
if (message == null) return;
headsCount++;
var payload = message.Payload;
Console.WriteLine($"Received {payload} {headsCount} from {CoinFlipMessage.HeadsTopic}.");
File.AppendAllText(@"heads.txt", payload + Environment.NewLine);
}
private static void HandleTails(CoinFlipMessage message)
{
if (message == null) return;
tailsCount++;
var payload = message.Payload;
Console.WriteLine($"Received {payload} {tailsCount} from {CoinFlipMessage.TailsTopic}.");
File.AppendAllText(@"tails.txt", payload + Environment.NewLine);
}
private static int headsCount;
private static int tailsCount;
}
}
Message:
using System;
namespace Messages
{
public class CoinFlipMessage
{
public string Payload { get; set; }
public static string HeadsTopic = "TopicHeads";
public static string TailsTopic = "TopicTails";
}
}
Upvotes: 1