Reputation: 43
Hello! I am writing a bit of code to set up a Publisher/Subscriber Communication over tcp using ZeroMQ. So far I have managed to send and receive the data according to topics. However, the subscriber reads the Topics as Messages and Messages as topics. i.e The Sub program prints:
instead of
I am at a bit of a loss here as I can't seem to figure out why this bug is happening. Any help is greatly appreciated! :)
Here is my code:
int Send_Dummy_Data(const std::string tpc, std::string data)
{
zmq::context_t context(1);
zmq::socket_t output_publisher(context, ZMQ_PUB);
std::string transport("tcp://*:5556");
output_publisher.bind(transport);
while (true)
{
zmq::message_t topic(tpc.length());
zmq::message_t message(data.length());
memcpy(topic.data(), tpc.data(), tpc.size());
memcpy(message.data(), data.data(), data.length());
try
{
output_publisher.send(topic, zmq::send_flags::sndmore);
output_publisher.send(message, zmq::send_flags::dontwait);
}
catch (zmq::error_t &e)
{
std::cout << e.what() << std::endl;
return -1;
}
message.rebuild(data.length());
topic.rebuild(tpc.size());
}
return 0;
}
int main(int argc, char *argv[])
{
Send_Dummy_Data("ABC", "Pub says hello");
return 0;
}
void sub()
{
const std::string TOPIC = "ABC";
std::string transport("tcp://xxx.xxx.xxx.xx:5556");
zmq::context_t context_t(1); // -? What is context
zmq::socket_t subscriber(context_t, ZMQ_SUB);
subscriber.connect(transport);
subscriber.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length());
zmq::pollitem_t items[] = {
{zmq_socket, 0, ZMQ_POLLIN, 0},
};
while (true)
{
int rc = 0;
zmq::message_t rx_msg;
zmq::message_t topic;
zmq::poll(&items[0], 1, -1);
if (items[0].revents & ZMQ_POLLIN)
{
rc = subscriber.recv(&topic, ZMQ_RCVMORE);
rc = subscriber.recv(&rx_msg) && rc;
std::string rx_str;
std::string rx_topic;
rx_topic.assign(static_cast<char *>(topic.data()), topic.size());
rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
if (rc > 0)
{
std::cout << "Topic: " << rx_topic << std::endl;
std::cout << "Received: " << rx_str << std::endl;
}
}
}
}
int main()
{
std::thread t_pub(sub);
t_pub.join();
return 0;
}
Upvotes: 0
Views: 790
Reputation: 1042
This code looks odd, why are you passing the ZMQ_RCVMORE flag to recv?
rc = subscriber.recv(&topic, ZMQ_RCVMORE);
rc = subscriber.recv(&rx_msg) && rc;
You should use ZMQ_RCVMORE to check if there are more messages after the first recv like this (or similar)
int more;
size_t more_len = sizeof(more);
data_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
What happens if you just change the code to this?
subscriber.recv(&topic);
subscriber.recv(&rx_msg);
You may be messing up the order flow by passing a bad flag to recv.
Upvotes: 0