Reputation: 2841
It is a simple PUB/SUB
program using pyzmq
and multiprocessing
.
The server is PUB
. It sends a slice of an ahah
list to client SUB
every time.
The client SUB
first .recv_string()
one message, then it changes the socket .recv_string()
-processing mode to a NOBLOCK
one, inside the .Poller()
loop.
import logging
import zmq
from multiprocessing import Process
def server_init(port_pub):
ahah = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
index = 0
num = 2
context = zmq.Context()
socket_pub = context.socket(zmq.PUB)
socket_pub.bind("tcp://127.0.0.1:%s" % port_pub)
# socket_rep = context.socket(zmq.REP)
# socket_rep.bind("tcp://*:%s" % port_rep)
socket_pub.send_string(' '.join(str(v) for v in ahah[index : index + num - 1]))
index = index + num
poller_pub = zmq.Poller()
poller_pub.register(socket_pub, zmq.POLLOUT)
should_continue = True
while should_continue:
socks = dict(poller_pub.poll())
if socket_pub in socks and socks[socket_pub] == zmq.POLLOUT and index <= 9:
socket_pub.send_string(' '.join(str(v) for v in ahah[index : index + num - 1]), zmq.NOBLOCK)
index = index + num
else:
should_continue = False
poller_pub.unregister(socket_pub)
def client(port_sub):
context = zmq.Context()
socket_sub = context.socket(zmq.SUB)
socket_sub.connect("tcp://127.0.0.1:%s" % port_sub)
tmp = socket_sub.recv_string()
process_message(tmp)
poller_sub = zmq.Poller()
poller_sub.register(socket_sub, zmq.POLLIN)
should_continue = True
while should_continue:
socks = dict(poller_sub.poll())
if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
tmp = socket_sub.recv_string(zmq.NOBLOCK)
process_message(tmp)
else:
should_continue = False
poller_pub.unregister(socket_sub)
def process_message(msg):
print("Processing ... %s" % msg)
if __name__ == '__main__':
logging.info('starting')
Process(target=server_init, args=(5566,)).start()
Process(target=client, args=(5566,)).start()
When I launch the program, it just stucks there and outputs nothing like:
$ python test.py
Until after a Ctrl-C
is pressed:
$ python test2.py
^CProcess Process-2:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/multiprocessing/popen_fork.py", line 29, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "test2.py", line 38, in client
tmp = socket_sub.recv_string()
File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/site-packages/zmq/sugar/socket.py", line 402, in recv_string
b = self.recv(flags=flags)
File "zmq/backend/cython/socket.pyx", line 674, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:6971)
File "zmq/backend/cython/socket.pyx", line 708, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:6763)
File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1931)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7222)
KeyboardInterrupt
I think the client should at least .recv()
one msg
.
But why not?
Upvotes: 0
Views: 541
Reputation: 1
Besides other reasons, your client side has simply forgotten to subscribe to anything meaningful before calling the first .recv_string()
, thus it hangs forever in a blocking-mode receive, while there is nothing that can meet the SUB
-side TOPIC
-filter on received messages and thus no such one will ever pass to .recv_string()
-processing.
Just add socket_sub.setsockopt( "" )
as the ZeroMQ
default is to rather subscribe to nothing ( as no one can indeed guess any magic of what shall pass the TOPIC-filter in one's actual context, so as a paradox, nothing seems to be the best choice available in this sense ).
Next also be careful on timing ( .bind()
/ .connect()
) sensitivity.
For more details, do not hesistate to download and read the fabulous Pieter HINTJENS' book "Code Connected, Volume 1".
Upvotes: 1