Reputation: 1870
I can only find old C++ source examples. Anyways, I did mine, based on them. Here's my publisher in python:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")
while True:
msg = "hello"
socket.send_string(msg)
print("sent "+ msg)
sleep(5)
And here's the subscriber in C++:
void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
while (true) {
zmq_msg_t msg;
int rc;
rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);
std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}
Initially, I tried zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") );
but I guess I should receive everything if I don't set this, right? So I commented it.
When I run the code, I see "waiting for message..." forever.
I tried to listen to TCP traffic using tcpdump
. Turns out that when the publisher is turned on, I see a lot of garbage on the 5563
port, and when I turn the publisher off, they stop. When I tried a PUSH/PULL
scheme, I could see the plaintext message in tcpdump
. (I tried pushing with nodejs and pulling with c++ and it worked).
What could I be doing wrong?
I tried different combinations of .bind()
, .connect()
, localhost
, 127.0.0.1
, but they won't work either.
UPDATE: I've just read that I must subscribe to something, so I did zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 );
to subscribe to everything but I still receive nothing
PyZMQ is in version 17.0.0.b3 and has ZeroMQ 4.2.3
C++ has ZeroMQ 4.2.2
UPDATE 2:
Updates both to 4.2.3, won't work either.
Upvotes: 11
Views: 3599
Reputation: 1870
it's me, the one that asked the question.
I manage to get working by exchanging socket.bind("tcp://*:5563")
to socket.connect("tcp://dns_address_of_my_dcker_container:5564")
in python,
and exchanging zmq_connect(subscriber, "tcp://localhost:5563")
to zmq_bind(subscriber, "tcp://*:5563")
in C++
The examples that I found online said that I should use bind
for the publisher and connect
for the subscriber, but it wouldn't work in any way for me. Do anyone have an idea why?
ZeroMQ documentation says the following:
The zmq_bind() function binds the socket to a local endpoint and then accepts incoming connections on that endpoint.
The zmq_connect() function connects the socket to an endpoint and then accepts incoming connections on that endpoint.
I don't have a clear idea of what changed, but it worked.
Upvotes: 0
Reputation: 1299
The correct recipe to run the PUB/SUB pattern (regardless of language) is:
socket(zmq.PUB)
bind("tcp://127.0.0.1:5555")
encoded_topic = topic.encode()
encoded_msg = msg.encode()
send_multipart([encoded_topic, encoded_msg])
socket(zmq.SUB)
setsockopt(zmq.SUBSCRIBE, topic.encode())
connect("tcp://127.0.0.1:5555")
answer = recv_multipart()
enc_topic, enc_msg = answer
topic = enc_topic.decode()
msg = enc_msg.decode()
In general, steps Pub - 2 / Sub - 3 (ie bind/connect) and Pub - 3 / Sub - 5 (ie encode/decode or dumps/loads) need to be complementary to each other in order for things to work.
Upvotes: 2
Reputation: 1
"I guess I should receive everything if I don't set this, right?"
No, this is not a correct assumption. You may like a collection of my other ZeroMQ posts here, about a { plain-string | unicode | serialisation }-issues and the { performance- | traffic- }-impacts actual policy ( SUB
-side topic-filter processing on early ZeroMQ versions, and/or the PUB
-side processing for more recent ones ) one may encounter in heterogeneous distributed-systems' design, using ZeroMQ.
( Any other Scalable Formal Communication Archetype Pattern, like the observed PUSH/PULL
, does nothing with the subscription policy, so will work independently of the subscription-matching processing against a set topic-filter list. )
.send()
-s anything at all:Let's mock-up a fast pythonic receiver, to see, if the sender indeed sends anything down the lane:
import zmq
aContext = zmq.Context() # .new Context
aSUB = aContext.socket( zmq.SUB ) # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" ) # .connect
aSUB.setsockopt( zmq.LINGER, 0 ) # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # .set T-filter
MASK = "INF: .recv()-ed this:[{0:}]\n: waited {1: > 7d} [us]"
aClk = zmq.Stopwatch();
while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close() # .close ALWAYS!
aContext.term() # .term ALWAYS!
This ought report whatever the PUB
-sender is actually .send()
-ing over the wire and also the actual message inter-arrival times ( in [us]
, glad the ZeroMQ has included this tool for debugging and performance / latency tweaking ).
If ACK-ed as you see the live INF:
-messages actually ticking on screen, keep it running and it now makes sense to proceed to the next step.
#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB = zmq_socket( aContext, ZMQ_SUB ); std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect( aSUB, "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_SUBSCRIBE, "", 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_LINGER, 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ... ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t msg; /* Create an empty ØMQ message */
rc = zmq_msg_init (&msg); assert (rc == 0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg); /* Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close( aSUB ); std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext ); std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
Upvotes: 5
Reputation: 10729
What is the return value for zmq_setsockopt()
?
Then you should use ""
instead of a NULL
, they are different.
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );
As API defines:
Return value
The
zmq_setsockopt()
function shall return zero if successful. Otherwise it shall return -1 and seterrno
to one of the values defined below.
...
Upvotes: 2