Reputation: 33
We are using ZeroMQ for a project, with the following architecture:
This is basically our module we wrote in C for using ZeroMQ:
#define PUB_ADDR "ipc:///tmp/broker_frontend"
#define SUB_ADDR "ipc:///tmp/broker_backend"
#define PUB_MON_ADDR "inproc://monitor-publisher"
#define SUB_MON_ADDR "inproc://monitor-subscriber"
#define RET_ERROR(x) syslog(LOG_ERR, x "%s", zmq_strerror(errno)); cleanupZMQ(); return false;
static zmq_ctx_t ctx = NULL;
static zmq_sock_t pub_sock = NULL;
static zmq_sock_t sub_sock = NULL;
static zmq_sock_t sub_mon_sock = NULL;
static zmq_sock_t pub_mon_sock = NULL;
static char* subscriptions[] = { REQ_TOPIC };
static size_t subscriptions_count = (sizeof(subscriptions) / sizeof(subscriptions[0]));
static inline void cleanupZMQ() {
if (pub_sock) {
zmq_close(pub_sock);
}
if (sub_sock) {
zmq_close(sub_sock);
}
if (sub_mon_sock) {
zmq_close(sub_mon_sock);
}
if (pub_mon_sock) {
zmq_close(pub_mon_sock);
}
if (ctx) {
zmq_ctx_destroy(ctx);
}
}
static bool waitForConnect(zmq_sock_t monitor) {
zmq_msg_t msg;
bool ret = true;
bool connected = false;
bool handshaked = false;
do {
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, monitor, 0);
if (rc < 0) {
RET_ERROR("Error! Can not receive first frame from monitor: ")
}
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
uint16_t event = *((uint16_t*)data);
uint32_t value = *((uint32_t*)data+2);
zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, monitor, 0);
if (rc < 0) {
RET_ERROR("Error! Can not receive second frame from monitor: ")
}
char* addr = (char*)zmq_msg_data(&msg);
syslog(LOG_DEBUG, "Event: %u, Value: %u, Addr: %s", event, value, addr);
if (event == ZMQ_EVENT_CONNECTED) {
syslog(LOG_INFO, "Connected to '%s'.", addr);
connected = true;
} else if (event == ZMQ_EVENT_CONNECT_DELAYED) {
syslog(LOG_NOTICE, "Connecting delayed!");
} else if (event == ZMQ_EVENT_CONNECT_RETRIED) {
syslog(LOG_NOTICE, "Connecting retried!");
} else if ((event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
|| (event == ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
|| (event == ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)) {
syslog(LOG_ERR, "Error! Handshake with '%s' failed: %s", addr, zmq_strerror(value));
handshaked = true;
ret = false;
} else if (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED) {
syslog(LOG_INFO, "Handshake with '%s' succeded.", addr);
handshaked = true;
ret = true;
} else {
syslog(LOG_NOTICE, "Unexpected event: %u", event);
}
} while(!(connected && handshaked));
zmq_msg_close(&msg);
return ret;
}
bool startZMQ() {
if (ctx == NULL) {
ctx = zmq_ctx_new();
if (ctx == NULL) {
RET_ERROR("Error! Can not open ZMQ context: ");
}
} else {
syslog(LOG_INFO, "ZMQ is already started.");
}
if (sub_sock == NULL) {
sub_sock = zmq_socket(ctx, ZMQ_SUB);
if (sub_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ sub socket: ");
}
if (zmq_socket_monitor(sub_sock, SUB_MON_ADDR, ZMQ_EVENT_ALL)) {
RET_ERROR("Error! Can not monitor ZMQ sub socket: ");
}
sub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
if (sub_mon_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ sub-monitor socket: ");
}
if (zmq_connect(sub_mon_sock, SUB_MON_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ sub-monitor socket: ");
}
for (size_t i=0; i<subscriptions_count; i++) {
if (zmq_setsockopt(sub_sock, ZMQ_SUBSCRIBE, subscriptions[i], strlen(subscriptions[i]))) {
syslog(LOG_ERR, "Error! Can not subscribe to topic '%s': %s", subscriptions[i], zmq_strerror(errno));
} else {
syslog(LOG_INFO, "Subscribed to '%s'.", subscriptions[i]);
}
}
if (zmq_connect(sub_sock, SUB_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ sub socket: ");
}
waitForConnect(sub_mon_sock);
} else {
syslog(LOG_INFO, "Subscriber socket is already open.");
}
if (pub_sock == NULL) {
pub_sock = zmq_socket(ctx, ZMQ_PUB);
if (pub_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ pub socket: ");
}
if (zmq_socket_monitor(pub_sock, PUB_MON_ADDR, ZMQ_EVENT_ALL)) {
RET_ERROR("Error! Can not monitor ZMQ pub socket: ");
}
pub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
if (pub_mon_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ pub-monitor socket: ");
}
if (zmq_connect(pub_mon_sock, PUB_MON_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ pub-monitor socket: ");
}
if (zmq_connect(pub_sock, PUB_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ pub socket: ");
}
waitForConnect(pub_mon_sock);
} else {
syslog(LOG_INFO, "Publisher socket is already open.");
}
sleep(3);
return true;
}
size_t sendZMQMsg(const char* topic, size_t topic_len, const msg_buffer_t msg, size_t msg_len) {
size_t sended = 0;
int rc = zmq_send(pub_sock, topic, topic_len, ZMQ_SNDMORE);
if (rc < 0) {
syslog(LOG_ERR, "Error! Could not send ZMQ topic: %s", zmq_strerror(errno));
return 0;
}
sended += rc;
rc = zmq_send(pub_sock, msg, msg_len, 0);
if (rc < 0) {
syslog(LOG_ERR, "Error! Could not send ZMQ message: %s", zmq_strerror(errno));
return 0;
}
sended += rc;
return sended;
}
void endZMQ() {
cleanupZMQ();
}
As you can see, we had to add an sleep(3)
at the end of our startZMQ()
function. Without this the first few messages sent would get lost.
Of course we know about this 'slow joiner syndrome'. We ensure that the broker is ready before connecting anything to it and the subscribers are connected (also with the three second delay) before the publishers. But still, we have to wait these three seconds before the publishers can use their sockets. The publishers and subscribers do not know each other due to the central broker and we do not want that they have to connect directly to each other, since we have a lot of both parts and if anyone had to directly connect to anyone else, the system would basically unmaintainable.
We found this Question and this one and of course we read the Guide, especially this Part, where in the guide themself a sleep(1)
was used and than a polling with a second socket-pair is recommended.
Is there really no other way in this library to check if your socket is ready, than polling it?
As you can see we already catching the zmq-events in our waitForConnect
function using zmq-monitor-sockets. Should this not be enough? Are we missing something here?
Upvotes: 0
Views: 140
Reputation: 33
All explanations refer to our pub-sub architecture with the central broker.
As described, waiting for the signals ZMQ_EVENT_CONNECTED
and ZMQ_EVENT_HANDSHAKE_SUCCEEDED
were not enough, to ensure that the publisher socket is usable. Further test showed, that the subscriber sockets suffer from the same problem.
To be clear: we do not know how ZeroMQ can be properly used if you have no statement about it, if your socket is ready for usage.
You can just start sending, and ignoring that the first messages are lost, not even buffered or something similar, just lost.
You can wait a small amount of time, as shown in this Part of the documentation, but as described in the same Part of the documentation, it is unreliable, because you will never know for sure if the amount of time was enough and your sockets are really ready.
The solution we came up with is the following: we added a neutral node to the system.
This node is connected to the broker, like everyone else, but does nothing more than answer synchronization requests from every other node in the system.
These requests only contain the pid of the sending node, which will be copied into the response for the request. So can every node identify, if its own request was acknowledged.
This solution brings two drawbacks:
Also it exacerbates the problem, that you do not know if your counterpart is ready. If your nodes get the information, if the counterpart is up from an other source, than sending messages (i.e. from top
or looking into /proc
), you still do not know if the ZeroMQ connections for this counterpart are ready.
So you have to decide for every publisher, if it just sends a stream and do not care about, if someone listens or not, or you have to acknowledge every message by the counterpart.
Upvotes: 0
Reputation: 8424
Fundamentally, you're limited by the fact that tcp (which underpins ZMQ) is buffered. ZMQ does a lot of ensure that messages sent are delivered (heartbeat, whole message delivery, etc), so long at the connection is live, but it doesn't solve all problems.
tcp and ZMQ both implement "Actor Model"; data sent is buffered by the transport. The sender can return from a send(), before the recipient has called recv(). If you want an absolute guarantee of delivery (or confirmation that it's not made it), you need to have some sort of ack message being transferred back to the sender.
What you then end up with then is Communicating Sequential Processes. This is Actor Model, but the connection between sender and recipient does not buffer the data being transferred. The send() does not complete until the recipient's recv() has completed. This serves as an execution rendezvous (the act of sending means your code knows whereabouts in its execution it has got to - namely recv() has completed). This is particularly useful if you have a (quasi) real time requirement; your sender can know if the recipient isn't keeping up!
CSP is implemented in Go and Rust - it's kinda coming back into fashion.
==EDIT==
With multi-stage patterns like this, the best thing to do is to have "readiness" ripple from the end back towards the start. I've often done this with explicit messaging to create such an execution rendezvous, but you've indicated a desire not to have to do that.
So, you could:
The only flaw is if you want all the publishers to block until all are connected to the proxy. The only way of achieving that is with some other interconnectivity (e.g. a PUB on the proxy SUBscribed to by the publishers explicitly to receive a "go" message). With such execution rendezvous, one is fundamentally limited by the fact that tcp / ip buffers data everywhere, so the "go" message isn't going to arrive everywhere at exactly the same time; that likely matters only for the most severe of real time requirements. However, by doing everything from the end to the start, one does at least guarantee that processes reach readiness in the right order.
By the way, your code snippet omits the proxy shown in the figure.
Upvotes: 1