jcollum
jcollum

Reputation: 46579

how do I get a ZMQ Router to raise an error if it is busy?

I've got a REQ -> ROUTER -> [DEALER,DEALER... DEALER] setup going where REQ is a client, ROUTER is a queue and the DEALER sockets are workers that process data and send it back to ROUTER which sends it back to REQ. Working fine when there are enough DEALERs to handle the work. But if I slow down the DEALERs the ROUTER will never tell me that it's getting more work than it can handle.

The docs say:

ROUTER sockets do have a somewhat brutal way of dealing with messages they can't send anywhere: they drop them silently. It's an attitude that makes sense in working code, but it makes debugging hard. The "send identity as first frame" approach is tricky enough that we often get this wrong when we're learning, and the ROUTER's stony silence when we mess up isn't very constructive.

Since ØMQ v3.2 there's a socket option you can set to catch this error: ZMQ_ROUTER_MANDATORY. Set that on the ROUTER socket and then when you provide an unroutable identity on a send call, the socket will signal an EHOSTUNREACH error.

I'm honestly not sure if that's the same problem that I'm seeing. Stony silence sure matches what I'm seeing.

Here's the code for the setup:

var argsToString, buildSocket, client, q;

buildSocket = function(desc, socketType, port) {
  var socket;
  log("creating socket: " + (argsToString(Array.apply(null, arguments))));
  socket = zmq.socket(socketType);
  socket.identity = "" + desc + "-" + socketType + "-" + process.pid + "-" + port;
  return socket;
};

argsToString = function(a) {
  return a.join(', ');
};

client = buildSocket("client", 'req', clientPort);

q = buildSocket("q", "router", qPort);

q.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1);

q.on('error', function() {
  return log('router error ' + argsToString(Array.apply(null, arguments)));
});

I can post more code if needed. The issue is that when the REQ socket sends 10 messages in a second but the DEALERs take 2 seconds to do their work the ROUTER just ignores incoming messages, regardless of ZMQ_ROUTER_MANDATORY. I've sent 1000s of messages and never seen an error (.on 'error') thrown from any of the sockets.

There's talk of ZMQ_HWM out there, but the node driver doesn't seem to support it for DEALERs or ROUTERs.

How can I manage a ROUTER that runs out of places to send messages to?

Upvotes: 0

Views: 918

Answers (3)

Jitske de Vries
Jitske de Vries

Reputation: 11

Maybe the DONT_WAIT flag might help you. This should throw an error when the receiving party can't receive any more messages, because the queue is full:

http://api.zeromq.org/4-1:zmq-send

ZMQ_DONTWAIT For socket types (DEALER, PUSH) that block when there are no available peers (or all peers have full high-water mark), specifies that the operation should be performed in non-blocking mode. If the message cannot be queued on the socket, the zmq_send() function shall fail with errno set to EAGAIN.

Upvotes: 1

user3666197
user3666197

Reputation: 1

To be . . . . . . . . . . . ( Manage a saturated ZeroMQ-primitive element )

or

not to be ? . . . . . . . . . . . . . . . . . . . . . . . . . ( Avoid any saturation )

While the problem defines the issue as:

"How do I get ... to raise an error if busy?",

any serious design shall, by default, take care by all means to prevent ( avoid ) saturation as such. Thus both an end-to-end FlowControl & any-to-any Signalling Control Planes shall be carefully designed & taken into account for achieving the saturation avoidance strategy.

ZeroMQ scaleable Formal Communications Patterns framework has harnessed a great deal of insight into the latter, rather than the former.

Many thanks to Martin Sustrik's & Pieter Hintjens' teams for designing-in that approach.


Using the hidden powers of ZeroMQ vs. attempts to make ZeroMQ-primitive(s) do jobs they are not designed for

As a bit more general note, this experience makes it better visible, that the concept of Abstraction-rich Formal Communication Patterns is the most exciting and powerful "Dark Mass" surprise "behind" ZeroMQ / nanomsg that is not always seen, understood & harnessed for achieving the best for the use of these grand tools.

Definitely worth time & efforts to read the whole book, first taking just pictures of principal ideas from Figs. and only after that going down to high-level design sketches and at the very end, code the sources.

Transport agnostic, fault resilient parallel load balancing simply does not get understood from separate lines of source code.


Next step

Your scenario may benefit a lot from more than just a .connect() of the ZeroMQ basic elements in a REQ -> ROUTER -> [DEALER,DEALER... DEALER] setup & put these into a more abstract communication pattern, that meets both your application needs and also serves well for the load-balancing and failure-resilience aspects of the real-world use.

You may need to also propagate a <state> of the [worker]-client(s) back to the work-units' [dispatcher] be it a single or a massive pool of load-balanced / on-demand fork-ed ZeroMQ-primitive(s).

This may sound complicated for the first few things one tries to code with ZeroMQ, but if you at least jump to the page 265 of the Code Connected, Volume 1 (PDF), if it were not the case of reading step-by-step there.

The fastest-ever learning-curve would be to have first an un-exposed view on the Fig.60 Republishing Updates and Fig.62 HA Clone Server pair for a possible High-availability approach and then go back to the roots, elements and details.

Upvotes: 0

Jason
Jason

Reputation: 13766

First of all, if you're implementing a particular pattern (as I know, from your previous question, that you're implementing Paranoid Pirate), then it's always helpful to say that, since it will provide context for your code.

What you're talking about is specifically not addressed in Paranoid Pirate. You can see this by skipping down the guide to the Titanic pattern... when you're dealing with sporadic connectivity issues or, in your case, sporadic availability because your workers are still working as new messages arrive, you have to maintain the state of your workers in your queue to know what you need to do with that message... either send it to an available worker, or store it somewhere so that when a worker becomes available, you can pull it out and send it.

If you do this as strictly as possible, you're subverting the "queue" nature of ZMQ, but you avoid the uncertainty inherent in the HWM that will drop messages, rather than crash your system.

You could maintain a buffer, keep on adding messages to the queue until you recognize that you're, say, 40% into the HWM (which is dependent on the size of your messages)... that will give you a buffer before you have to start saving the messages, but in the end the process is the same.

This is an area where ZMQ offloads responsibility to the application designer, because there's no single "right" way to do things for all scenarios.


EDIT in response to comment:

Here's the basic gist of how I would handle this in node.js:

var worker_count = 0;
var job_count = 0;

// ...

q.on('message', function() {
    // ...

    if (msg.toString() == 'ready') worker_count++;
    else job_count--;

    // ...

    // this could use some TLC, but here's the basic gist of the logic...
    if (job_count >= worker_count) {
        // we'll assume the message was cached when it was received from
        // the req socket, if so nothing else to do here
    }
    else {
        // figure out if there is a cached message ready to go, if so, then...
        q.send(job);
        job_count++;
    }
});

Upvotes: 1

Related Questions