carobnodrvo
carobnodrvo

Reputation: 1051

ZeroMQ SUB never receives messages

I am having trouble with a PUB/SUB in ZeroMQ.

After connecting everything, publisher publishes all messages ( socket's send message returns true ) but the SUB never receives them and blocks forever on .recv() function.

Here is the code I am using:

void startPublisher()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB);
    zmq_socket.bind("tcp://127.0.0.1:58951");

    zmq::message_t msg(3);
    memcpy(msg.data(), "abc", 3);

    for(int i = 0; i < 10; i++)
        zmq_socket.send(msg); // <-- always true
}

void startSubscriber()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);

    zmq_socket.connect("tcp://127.0.0.1:58951");
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); // allow all messages

    zmq::message_t msg(3);
    zmq_socket.recv(&msg); // <-- blocks forever (message never received?)
}

Please note that I am runing these 2 functions in two different threads, starting SUB thread first, waiting for some time and then starting publisher thread ( also tried other way around with publisher sending messages in an endless-loop, but didn't work ).

What am I doing wrong here?

Upvotes: 3

Views: 1350

Answers (1)

Clonk
Clonk

Reputation: 2070

Based on your example, the following code works for me. The problem is that the PUB / SUB pattern is a slow joiner, meaning you need to wait a while after binding the PUB socket and before sending any message.

#include <thread>
#include <zmq.hpp>
#include <iostream>
#include <unistd.h>
void startPublisher()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB);
    zmq_socket.bind("tcp://127.0.0.1:58951");
    usleep(100000); // Sending message too fast after connexion will result in dropped message
    zmq::message_t msg(3);
    for(int i = 0; i < 10; i++) {
        memcpy(msg.data(), "abc", 3);
        zmq_socket.send(msg); // <-- always true
        msg.rebuild(3);
        usleep(1); // Temporisation between message; not necessary
    }
}
volatile bool run = false;
void startSubscriber()
{
    zmq::context_t zmq_context(1);
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);
    zmq_socket.connect("tcp://127.0.0.1:58951");
    std::string TOPIC = "";
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages
    zmq_socket.setsockopt(ZMQ_RCVTIMEO, 1000); // Timeout to get out of the while loop
    while(run) {
        zmq::message_t msg;
        int rc = zmq_socket.recv(&msg);  // Works fine
        if(rc) // Do no print trace when recv return from timeout
            std::cout << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl;
    }
}
int main() {
    run = true;
    std::thread t_sub(startSubscriber);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(startPublisher);
    t_pub.join();
    sleep(1);
    run = false;
    t_sub.join();
}

Upvotes: 3

Related Questions