Reputation: 6904
I am currently exploring possibilities to test my zeromq applications. I was under the impression that I could just have a publisher/subscriber in the same thread, let the publisher publish and the subscriber subscribe to it without losing messages. Yet, when I let the publisher send a couple of messages, none gets through to the subscriber.
Here is the code I use:
import zmq
def main():
ctx = zmq.Context.instance()
sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.HWM, 1000)
sender.bind('tcp://*:10001')
rcvr = ctx.socket(zmq.SUB)
rcvr.setsockopt(zmq.HWM, 1000)
rcvr.connect('tcp://127.0.0.1:10001')
rcvr.setsockopt(zmq.SUBSCRIBE, "")
for i in range(100):
sender.send('%i' % i)
while True:
try:
print rcvr.recv(zmq.NOBLOCK)
except zmq.ZMQError:
break
if __name__ == '__main__':
main()
When running this, I don't get any output.
What strikes me is that the receiver is connected before the sender sends, and thus should be queuing those messages. Or is that an assumption that is plain wrong and I should use PUSH/PULL instead?
Upvotes: 2
Views: 2528
Reputation: 21
I think this is a case of the slow joiner problem, described in the ZeroMQ guide.
This "slow joiner" symptom hits enough people often enough that we're going to explain it in detail.
I believe the main issue is that all the messages have been sent before the subscriber socket starts listening, and the messages fly by and get dropped. Putting a delay between setting up the sockets and sending the messages doesn't work because the last messages has been sent before the receiver starts listening.
As you have suggested, the push/pull sockets do queue jobs in memory. You can send jobs between sockets in a single process like this
# pushpull.py
import zmq
def main():
ctx = zmq.Context()
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:10001')
rcvr = ctx.socket(zmq.PULL)
rcvr.connect('tcp://127.0.0.1:10001')
for i in range(100):
sender.send_unicode('%i' % i)
while True:
msg = rcvr.recv()
print(msg)
if __name__ == '__main__':
main()
Or if you want to use the pub/sub sockets, we need two processes and a time.sleep(1)
between the socket setup and the message send-off:
First start the receiver
# rcvr.py
import zmq
def main():
ctx = zmq.Context()
rcvr = ctx.socket(zmq.SUB)
rcvr.connect('tcp://127.0.0.1:10001')
rcvr.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
msg = rcvr.recv()
print(msg)
if __name__ == '__main__':
main()
Then the sender,
# sender.py
import zmq
import time
def main():
ctx = zmq.Context()
sender = ctx.socket(zmq.PUB)
sender.bind('tcp://*:10001')
time.sleep(1)
for i in range(100):
sender.send_unicode('%i' % i)
if __name__ == "__main__":
main()
to receive:
b'0'
b'1'
b'2'
b'3' ...
I'm working in Python 3.3 and pyzmq 13.1.0 at the moment with the awesome WinPython distribution, so some of the string handling in the zmq calls is a little different, as well as the print function. Hope it helps.
Upvotes: 2
Reputation: 38588
What strikes me is that the receiver is connected before the sender sends
This isn't actually true - the receiver has started the connection process, but that does not mean that the process has completed. Connections are asynchronous.
If you are actually using this for intra-process communication, I would recommend using the inproc
transport, where this is not an issue:
url = 'inproc://whatever'
sender.bind(url)
...
recvr.connect(url)
Upvotes: 0
Reputation: 11
You should be connecting your SUB socket to port 10000 not 10001. Currently the SUB socket is waiting for a publisher and the PUB socket is waiting for a subscriber. 0mq's feature of allowing 'clients' to connect without 'servers' already being present also means there is no error thrown when you connect to port 10001 and that is by design.
Upvotes: 1