Reputation: 575
I've noticed weird behavior with zmq.PUB
, it requires to have while loop to send message. Example:
Imagine I have a sub waiting for a pub:
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
subscriber.connect ('tcp://*:5555')
while True:
msg = socket.recv()
And I want to send a message from pub:
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://*:%5555')
publisher.send(''.join(sys.argv[1]))
This message won't reach the sub for some reason. You can fix this by adding while
or for
loop before sending a message publisher.send(''.join(sys.argv[1]))
Why is that? Do I always have to have a loop with publisher to distribute messages to multiple workers?
Upvotes: 3
Views: 1851
Reputation: 312630
Actually, there are a few problems here.
First, the code you've posted has a number of errors in it (for example, you can't connect
to tcp://*:5555
), but I'll assume that it's representative of what you're trying to do.
Update
I'm going to leave my original answer below, because I think it's still useful although not directly relevant to your question (possibly due to a morning caffeine deficit, who knows?).
Since your publisher is calling send
with a single message and then
exiting immediately, the subscribers probably never have time to
connect()
. You would typically expect the code calling bind
to be
the long-running process. If you insert a sleep
between the bind
and send
call, things will work as expected:
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.bind('tcp://*:5555')
time.sleep(1)
pub.send('message 1')
This works fine, assuming the following subscriber:
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://localhost:5555')
while True:
msg = sub.recv()
print msg
However, this isn't a great solution...anything using sleep
for
synchronization is bound to fail. A different way of solving this
would be to have your publisher open both a PUB
socket and a REP
socket. To send a message to all subscribers, your tool would open a
REQ
socket to the broker, which would then publish it to all
subscribers.
Original answer follows
The problem you're hitting is probably
this. Since you are
having your subscriber call bind
and your sender call connect
,
it is likely that your are simply losing the message because there is
a delay between a successful connection and the subscriber
successfully subscribing to messages. Also see the question "Why do I see
different behavior when I bind a socket versus connect a socket?" in
the FAQ.
Demonstrating this with code, if we have this subscriber:
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.bind('tcp://*:5555')
while True:
msg = sub.recv()
print msg
And the following publisher:
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.connect('tcp://localhost:5555')
pub.send('message 1')
pub.send('message 2')
time.sleep(1)
pub.send('message 3')
The subscriber will probably always end up printing:
message 3
This is because the first to send
operations happen to quickly.
This is generally not considered a problem, since pub/sub is explicitly not a reliable transport mechanism. In the typical use case, there is a single publisher calling bind
and multiple subscribers calling connect
that may not all be present initially or continuously. There is no guarantee that every subscriber will receive every message.
If you need to reliably transport a single message, consider a REQ
/REP
socket pair instead.
Upvotes: 2