Shorty
Shorty

Reputation: 43

ZeroMQ Pub/Sub - Message and Topic being read in wrong order

Issue

Hello! I am writing a bit of code to set up a Publisher/Subscriber Communication over tcp using ZeroMQ. So far I have managed to send and receive the data according to topics. However, the subscriber reads the Topics as Messages and Messages as topics. i.e The Sub program prints:



instead of



I am at a bit of a loss here as I can't seem to figure out why this bug is happening. Any help is greatly appreciated! :)

Here is my code:

Publisher

int Send_Dummy_Data(const std::string tpc, std::string data)
{
    zmq::context_t context(1);
    zmq::socket_t output_publisher(context, ZMQ_PUB);
    std::string transport("tcp://*:5556");
    output_publisher.bind(transport);

    while (true)
    {
        zmq::message_t topic(tpc.length());
        zmq::message_t message(data.length());

        memcpy(topic.data(), tpc.data(), tpc.size());
        memcpy(message.data(), data.data(), data.length());

        try
        {
            output_publisher.send(topic, zmq::send_flags::sndmore);
            output_publisher.send(message, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t &e)
        {
            std::cout << e.what() << std::endl;
            return -1;
        }

        message.rebuild(data.length());
        topic.rebuild(tpc.size());
    }

    return 0;
}

int main(int argc, char *argv[])
{
    Send_Dummy_Data("ABC", "Pub says hello");
    return 0;
}

Subscriber

void sub()
{

    const std::string TOPIC = "ABC";

    std::string transport("tcp://xxx.xxx.xxx.xx:5556");
    zmq::context_t context_t(1); // -? What is context
    zmq::socket_t subscriber(context_t, ZMQ_SUB);
    subscriber.connect(transport);
    subscriber.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length());

    zmq::pollitem_t items[] = {
        {zmq_socket, 0, ZMQ_POLLIN, 0},

    };
    while (true)
    {

        int rc = 0;
        zmq::message_t rx_msg;
        zmq::message_t topic;
        zmq::poll(&items[0], 1, -1);

        if (items[0].revents & ZMQ_POLLIN)
        {
            rc = subscriber.recv(&topic, ZMQ_RCVMORE);
            rc = subscriber.recv(&rx_msg) && rc;

            std::string rx_str;
            std::string rx_topic;

            rx_topic.assign(static_cast<char *>(topic.data()), topic.size());
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            if (rc > 0)
            {
                std::cout << "Topic: " << rx_topic << std::endl;
                std::cout << "Received: " << rx_str << std::endl;
            }
        }
    }
}

int main()
{
    std::thread t_pub(sub);
    t_pub.join();
    return 0;
}

Upvotes: 0

Views: 790

Answers (1)

jamesdillonharvey
jamesdillonharvey

Reputation: 1042

This code looks odd, why are you passing the ZMQ_RCVMORE flag to recv?

            rc = subscriber.recv(&topic, ZMQ_RCVMORE);
            rc = subscriber.recv(&rx_msg) && rc;

You should use ZMQ_RCVMORE to check if there are more messages after the first recv like this (or similar)

    int more;
    size_t more_len = sizeof(more);
    data_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);

What happens if you just change the code to this?

            subscriber.recv(&topic);
            subscriber.recv(&rx_msg);

You may be messing up the order flow by passing a bad flag to recv.

Upvotes: 0

Related Questions