Reputation: 245
I'm having an issue with ZeroMQ, which I believe is because I'm not very familiar with it.
I'm trying to build a very simple service where multiple clients connect to a server and sends a query. The server responds to this query.
When I use REQ-REP socket combination (client using REQ, server binding to a REP socket) I'm able to get close to 60,000 messages per second at server side (when client and server are on the same machine). When distributed across machines, each new instance of client on a different machine linearly increases the messages per second at the server and easily reaches 40,000+ with enough client instances.
Now REP socket is blocking, so I followed ZeroMQ guide and used the rrbroker pattern (http://zguide.zeromq.org/cs:rrbroker):
REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)]
However, this completely screws up the performance. I'm getting only around 4000 messages per second at the server when running across machines. Not only that, each new client started on a different machine reduces the throughput of every other client.
I'm pretty sure I'm doing something stupid. I'm wondering if ZeroMQ experts here can point out any obvious mistakes. Thanks!
Edit: Adding code as per advice. I'm using the clrzmq nuget package (https://www.nuget.org/packages/clrzmq-x64/)
Here's the client code. A timer counts how many responses are received every second.
for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); }
void Client()
{
using (var ctx = new Context())
{
Socket socket = ctx.Socket(SocketType.REQ);
socket.Connect("tcp://192.168.1.10:1234");
while (true)
{
socket.Send("ping", Encoding.Unicode);
string res = socket.Recv(Encoding.Unicode);
}
}
}
Server - case 1: The server keeps track of how many requests are received per second
using (var zmqContext = new Context())
{
Socket socket = zmqContext.Socket(SocketType.REP);
socket.Bind("tcp://*:1234");
while (true)
{
string q = socket.Recv(Encoding.Unicode);
if (q.CompareTo("ping") == 0) {
socket.Send("pong", Encoding.Unicode);
}
}
}
With this setup, at server side, I can see around 60,000 requests received per second (when client is on the same machine). When on different machines, each new client increases number of requests received at server as expected.
Server Case 2: This is essentially rrbroker from ZMQ guide.
void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers)
{
List<PollItem> pollItemsList = new List<PollItem>();
routerSocket = zmqContext.Socket(SocketType.ROUTER);
try
{
routerSocket.Bind(zmqConnectionString);
PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += RouterSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
dealerSocket = zmqContext.Socket(SocketType.DEALER);
try
{
dealerSocket.Bind("inproc://workers");
PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += DealerSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
// Start the worker pool; cant connect
// to inproc socket before binding.
workerPool.Start(numWorkers);
while (true)
{
zmqContext.Poll(pollItemsList.ToArray());
}
}
void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(routerSocket, dealerSocket);
}
void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(dealerSocket, routerSocket);
}
void RelayMessage(Socket source, Socket destination)
{
bool hasMore = true;
while (hasMore)
{
byte[] message = source.Recv();
hasMore = source.RcvMore;
destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE);
}
}
Where the worker pool's start method is:
public void Start(int numWorkerTasks=8)
{
for (int i = 0; i < numWorkerTasks; i++)
{
QueryWorker worker = new QueryWorker(this.zmqContext);
Task task = Task.Factory.StartNew(() =>
worker.Start(),
TaskCreationOptions.LongRunning);
}
Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks);
}
public class QueryWorker
{
Context zmqContext;
public QueryWorker(Context zmqContext)
{
this.zmqContext = zmqContext;
}
public void Start()
{
Socket socket = this.zmqContext.Socket(SocketType.REP);
try
{
socket.Connect("inproc://workers");
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not create worker, error: {0}", ze.Message);
return;
}
while (true)
{
try
{
string message = socket.Recv(Encoding.Unicode);
if (message.CompareTo("ping") == 0)
{
socket.Send("pong", Encoding.Unicode);
}
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not receive message, error: " + ze.ToString());
}
}
}
}
Upvotes: 3
Views: 1912
Reputation: 11
I have done done performance testing on calling a native unmanaged DLL function with various methods from C#: 1. C++/CLI wrapper 2. PInvoke 3. ZeroMQ/clrzmq
The last might be interesting for you.
My finding at the end of my performance test was that using the ZMQ binding clrzmq was not useful and produced a factor of 100 performance overhead after I tried to optimize the PInvoke calls within the source code of the binding. Therefore I have used the ZMQ without a binding but with PInvoke calls.these calls must be done with the cdecl convention and with the option "SuppressUnmanagedCodeSecurity" to get most speed. I had to import just 5 functions which was fairly easy. At the end the speed was a bit slower than a PInvoke call but with the ZMQ-in my case over "inproc".
This may give you the hint to try it without the binding, if speed is interesting for you.
This is not a direct answer for your question but may help you to increase performance in general.
Upvotes: 1
Reputation: 1715
Most probably the 'ROUTER' is the bottleneck.
Check out these related questions on this:
ROUTER (and ZMQ_STREAM, which is just a variant of ROUTER) internally has to maintain the client mapping, hence IMO it can accept limited connections from a particular client. It looks like ROUTER can multiplex multiple clients, only as long as, each client has only one active connection.
I could be wrong here - but I am not seeing much proof to the contrary (simple working code that scales to multi-clients with multi-connections with ROUTER or STREAM).
There certainly is a very severe restriction on concurrent connections with ZeroMQ, though it looks like no one know what is causing it.
Upvotes: 1
Reputation: 6669
Could you post some source code or at least a more detailed explanation of your test case? In general the way to build out your design is to make one change at a time, and measure at each change. You can always move stepwise from a known working design to more complex ones.
Upvotes: 1