Reputation: 329
For my Publisher/Subscriber pattern I want to use topics. So publish different messages on different topics. I already used topics in ZMQ with Python, but can not find how to use in C++. Is it possible to use topics with zmqcpp, or do I have to use different ports?
My publisher is very simple, similar to this one: http://zguide.zeromq.org/cpp:durapub
Thanks
Upvotes: 2
Views: 17005
Reputation: 1964
Assuming you already have a ZMQ server set up, topics are built in so simply do the following on your clients to subscribe utilizing the C++ API:
#include <zmq.hpp>
#include <zmq_addon.hpp>
.
.
.
std::unique_ptr<zmq::context_t> _context
std::shared_ptr<zmq::socket_t> _pubSocket;
std::shared_ptr<zmq::socket_t> _subSocket;
.
.
.
this->_context = std::make_unique<zmq::context_t>(this->_processingThreadCount);
this->_pubSocket = std::make_shared<zmq::socket_t>(*this->_context, zmq::socket_type::pub);
this->_pubSocket->connect(pubUrl);
this->_subSocket = std::make_shared<zmq::socket_t>(*this->_context, zmq::socket_type::sub);
this->_subSocket->connect(subUrl);
.
.
.
// this line allows your client to subscribe to a specific topic
this->_subSocket->set(zmq::sockopt::subscribe, topic);
.
.
.
// these lines will allow you to publish to a specific topic
zmq::message_t zmqTopic(topic);
zmq::message_t zmqEvent(event);
zmq::message_t zmqData(data);
this->_pubSocket->send(zmqTopic, zmq::send_flags::sndmore);
this->_pubSocket->send(zmqEvent, zmq::send_flags::sndmore);
this->_pubSocket->send(zmqData, zmq::send_flags::none);
Upvotes: 0
Reputation: 2070
Here is an example of a pub-sub in C++ :
#include <thread>
#include <zmq.hpp>
#include <iostream>
#include <signal.h>
#include <unistd.h>
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
if(s_interrupted == 0)
{
std::cout << "sighandler" << std::endl;
s_interrupted = 1;
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PAIR);
socket.connect("ipc://killmebaby");
zmq::message_t msg;
memcpy(msg.data(),"0", 1);
socket.send(msg);
}
}
// Setup signal handler to quit the program
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
const std::string TOPIC = "4567";
void startPublisher()
{
zmq::context_t zmq_context(1);
zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB);
zmq_socket.bind("ipc://localsock");
usleep(100000); // Sending message too fast after connexion will result in dropped message
zmq::message_t msg(3);
zmq::message_t topic(4);
for(int i = 0; i < 10; i++) {
memcpy(topic.data(), TOPIC.data(), TOPIC.size()); // <= Change your topic message here
memcpy(msg.data(), "abc", 3);
try {
zmq_socket.send(topic, ZMQ_SNDMORE);
zmq_socket.send(msg);
} catch(zmq::error_t &e) {
std::cout << e.what() << std::endl;
}
msg.rebuild(3);
topic.rebuild(4);
usleep(1); // Temporisation between message; not necessary
}
}
void startSubscriber()
{
zmq::context_t zmq_context(1);
zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);
zmq_socket.connect("ipc://localsock");
zmq::socket_t killer_socket(zmq_context, ZMQ_PAIR); // This socket is used to terminate the loop on a signal
killer_socket.bind("ipc://killmebaby");
zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // Subscribe to any topic you want here
zmq::pollitem_t items [] = {
{ zmq_socket, 0, ZMQ_POLLIN, 0 },
{ killer_socket, 0, ZMQ_POLLIN, 0 }
};
while(true)
{
int rc = 0;
zmq::message_t topic;
zmq::message_t msg;
zmq::poll (&items [0], 2, -1);
if (items [0].revents & ZMQ_POLLIN)
{
std::cout << "waiting on recv..." << std::endl;
rc = zmq_socket.recv(&topic, ZMQ_RCVMORE); // Works fine
rc = zmq_socket.recv(&msg) && rc;
if(rc > 0) // Do no print trace when recv return from timeout
{
std::cout << "topic:\"" << std::string(static_cast<char*>(topic.data()), topic.size()) << "\"" << std::endl;
std::cout << "msg:\"" << std::string(static_cast<char*>(msg.data()), msg.size()) << "\"" << std::endl;
}
}
else if (items [1].revents & ZMQ_POLLIN)
{
if(s_interrupted == 1)
{
std::cout << "break" << std::endl;
break;
}
}
}
}
int main() {
s_catch_signals ();
run = true;
std::thread t_sub(startSubscriber);
sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
std::thread t_pub(startPublisher);
t_pub.join();
t_sub.join();
}
You can find many more example in the examples section of the github repository
Upvotes: 5
Reputation: 4006
Not sure about the C++ API but with the C API you can subscribe to topics with the ZMQ_SUBSCRIBE socket option. I suspect the C++ API has a similar function.
This simply filters on messages that start with the same text as the topic text. You can use Pub-Sub Message Envelopes for a more robust solution. I can image that the Python API hides these implementation details.
Upvotes: 0