izdi
izdi

Reputation: 575

ZMQ Pub requires while loop to send message

I've noticed weird behavior with zmq.PUB, it requires to have while loop to send message. Example:

Imagine I have a sub waiting for a pub:

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
subscriber.connect ('tcp://*:5555')
while True:
    msg = socket.recv()

And I want to send a message from pub:

context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://*:%5555')
publisher.send(''.join(sys.argv[1]))

This message won't reach the sub for some reason. You can fix this by adding while or for loop before sending a message publisher.send(''.join(sys.argv[1]))

Why is that? Do I always have to have a loop with publisher to distribute messages to multiple workers?

Upvotes: 3

Views: 1851

Answers (1)

larsks
larsks

Reputation: 312630

Actually, there are a few problems here.

First, the code you've posted has a number of errors in it (for example, you can't connect to tcp://*:5555), but I'll assume that it's representative of what you're trying to do.

Update

I'm going to leave my original answer below, because I think it's still useful although not directly relevant to your question (possibly due to a morning caffeine deficit, who knows?).

Since your publisher is calling send with a single message and then exiting immediately, the subscribers probably never have time to connect(). You would typically expect the code calling bind to be the long-running process. If you insert a sleep between the bind and send call, things will work as expected:

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.bind('tcp://*:5555')

time.sleep(1)
pub.send('message 1')

This works fine, assuming the following subscriber:

ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://localhost:5555')

while True:
    msg = sub.recv()
    print msg

However, this isn't a great solution...anything using sleep for synchronization is bound to fail. A different way of solving this would be to have your publisher open both a PUB socket and a REP socket. To send a message to all subscribers, your tool would open a REQ socket to the broker, which would then publish it to all subscribers.


Original answer follows


The problem you're hitting is probably this. Since you are having your subscriber call bind and your sender call connect, it is likely that your are simply losing the message because there is a delay between a successful connection and the subscriber successfully subscribing to messages. Also see the question "Why do I see different behavior when I bind a socket versus connect a socket?" in the FAQ.

Demonstrating this with code, if we have this subscriber:

ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.bind('tcp://*:5555')

while True:
    msg = sub.recv()
    print msg

And the following publisher:

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.connect('tcp://localhost:5555')

pub.send('message 1')
pub.send('message 2')
time.sleep(1)
pub.send('message 3')

The subscriber will probably always end up printing:

message 3

This is because the first to send operations happen to quickly.

This is generally not considered a problem, since pub/sub is explicitly not a reliable transport mechanism. In the typical use case, there is a single publisher calling bind and multiple subscribers calling connect that may not all be present initially or continuously. There is no guarantee that every subscriber will receive every message.

If you need to reliably transport a single message, consider a REQ/REP socket pair instead.

Upvotes: 2

Related Questions