Reputation: 41
GOAL: send a lot of messages to all subscribers in one iteration. I have 40k-100k messages. I have started to use PUB/SUB socket type.
PROBLEM: number of received messages on subscribers is lower than number of sent messages on publisher. If I add a Thread.Sleep(1) after sending each message, then all messages get delivered, but with high number of messages that needs to be delivered, this means 40-100 seconds delay. Which is unacceptable.
The code below is in NetMQ (3.0.0) which is alpha build, but it is only for example, as I have implemented same code in c using libzmq 3.2.4 (stable). And simptoms are the same.
Publisher/server side:
using (var dbConn = new OracleConnection(ConfigurationManager.AppSettings["ConnString"]))
using (NetMQContext ctx = NetMQContext.Create())
{
using (var publisher = ctx.CreatePublisherSocket())
{
publisher.Bind(ConfigurationManager.AppSettings["PubSocket"]);
dbConn.Open();
NetMQMessage m = new NetMQMessage();
while (true)
{
var updateIds = new List<int>();
var deletedIds = new List<int>();
var changedRules = GetChangedItems(dbConn, ref updateIds);
var deletedRules = GetDeletedItems(dbConn, ref deletedIds);
foreach (var kvPair in changedRules)
{
var item= kvPair.Value;
publisher.Send(ToCsvLine(item));
//Thread.Sleep(1);
}
foreach (var kvPair in deletedRules)
{
var item = kvPair.Value;
publisher.Send(ToCsvLine(item));
//Thread.Sleep(1);
}
Thread.Sleep(1);
publisher.Send("end");
Console.WriteLine("Sent updated: {0}", updateIds.Count);
Console.WriteLine("Sent deleted: {0}", deletedIds.Count);
Thread.Sleep(6000);
}
}
Subscriber/client side:
using (NetMQContext ctx = NetMQContext.Create())
{
using (var consumer = ctx.CreateSubscriberSocket())
{
consumer.Connect("tcp://192.168.1.122:6005");
consumer.Subscribe("");
int count = 0;
while (true)
{
try
{
count++;
string msg = consumer.ReceiveString();
if (msg == "end")
{
Console.WriteLine("Count: {0}", count);
count = 0;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadLine();
}
}
}
}
Upvotes: 2
Views: 1561
Reputation: 13766
ZMQ sets a High Water Mark to 1000 by default. It's also not 100% exact. Can I assume that you could reach or exceed at least half of that in a single batch? If so, that's probably your issue. Set your HWM higher, to 10,000 or 50,000 (or, for testing purposes, just turn it off by setting it to 0), and see how your results change.
Upvotes: 5