JACK M
JACK M

Reputation: 2841

Python pyzmq: program stucks

It is a simple PUB/SUB program using pyzmq and multiprocessing.

The server is PUB. It sends a slice of an ahah list to client SUB every time.

The client SUB first .recv_string() one message, then it changes the socket .recv_string()-processing mode to a NOBLOCK one, inside the .Poller() loop.

import logging
import zmq
from multiprocessing import Process

def server_init(port_pub):
  ahah = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  index = 0
  num = 2

  context = zmq.Context()
  socket_pub = context.socket(zmq.PUB)
  socket_pub.bind("tcp://127.0.0.1:%s" % port_pub)

  # socket_rep = context.socket(zmq.REP)
  # socket_rep.bind("tcp://*:%s" % port_rep)

  socket_pub.send_string(' '.join(str(v) for v in ahah[index : index + num - 1]))
  index = index + num

  poller_pub = zmq.Poller()
  poller_pub.register(socket_pub, zmq.POLLOUT)

  should_continue = True
  while should_continue:
    socks = dict(poller_pub.poll())
    if socket_pub in socks and socks[socket_pub] == zmq.POLLOUT and index <= 9:
      socket_pub.send_string(' '.join(str(v) for v in ahah[index : index + num - 1]), zmq.NOBLOCK)
      index = index + num
    else:
      should_continue = False
      poller_pub.unregister(socket_pub)

def client(port_sub):
  context = zmq.Context()
  socket_sub = context.socket(zmq.SUB)
  socket_sub.connect("tcp://127.0.0.1:%s" % port_sub)

  tmp = socket_sub.recv_string()
  process_message(tmp)

  poller_sub = zmq.Poller()
  poller_sub.register(socket_sub, zmq.POLLIN)

  should_continue = True
  while should_continue:
    socks = dict(poller_sub.poll())
    if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
      tmp = socket_sub.recv_string(zmq.NOBLOCK)
      process_message(tmp)
    else:
      should_continue = False
      poller_pub.unregister(socket_sub)

def process_message(msg):
  print("Processing ... %s" % msg)


if __name__ == '__main__':
  logging.info('starting')
  Process(target=server_init, args=(5566,)).start()
  Process(target=client, args=(5566,)).start()

When I launch the program, it just stucks there and outputs nothing like:

$ python test.py

Until after a Ctrl-C is pressed:

$ python test2.py
^CProcess Process-2:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/multiprocessing/popen_fork.py", line 29, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
  File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "test2.py", line 38, in client
    tmp = socket_sub.recv_string()
  File "/Users/jack/.pyenv/versions/3.5.1/lib/python3.5/site-packages/zmq/sugar/socket.py", line 402, in recv_string
    b = self.recv(flags=flags)
  File "zmq/backend/cython/socket.pyx", line 674, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:6971)
  File "zmq/backend/cython/socket.pyx", line 708, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:6763)
  File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1931)
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7222)
KeyboardInterrupt

I think the client should at least .recv() one msg.

But why not?

Upvotes: 0

Views: 541

Answers (1)

user3666197
user3666197

Reputation: 1

Why?

Besides other reasons, your client side has simply forgotten to subscribe to anything meaningful before calling the first .recv_string(), thus it hangs forever in a blocking-mode receive, while there is nothing that can meet the SUB-side TOPIC-filter on received messages and thus no such one will ever pass to .recv_string()-processing.

Just add socket_sub.setsockopt( "" ) as the ZeroMQ default is to rather subscribe to nothing ( as no one can indeed guess any magic of what shall pass the TOPIC-filter in one's actual context, so as a paradox, nothing seems to be the best choice available in this sense ).

Next also be careful on timing ( .bind() / .connect() ) sensitivity.

For more details, do not hesistate to download and read the fabulous Pieter HINTJENS' book "Code Connected, Volume 1".

Upvotes: 1

Related Questions