nick huang
nick huang

Reputation: 463

Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?

please see the code below :


server.py

import zmq 
import time
from multiprocessing import Process
class A:
  def __init__(self):
    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.PUB)
    sock.bind('ipc://test')
    p = Process(target=A.run, args=(sock,))
    p.start()     # Process calls run, but the client can't receive messages
    p.join()      #
    #A.run(sock)  # this one is ok, messages get it to be received

  @staticmethod
  def run(sock):
    while True:
      sock.send('demo'.encode('utf-8'))
      print('sent')
      time.sleep(1)

if __name__ =='__main__':
  a = A()

client.py

import zmq 
ctx=zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
sock.connect('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '') 
while True:
  print(sock.recv())

In the constructor of server.py, if I call .run()-method directly, the client can receive the message, but when I use the multiprocessing.Process()-method, it fails. Can anyone explain on this and provide some advice?

Upvotes: 6

Views: 1681

Answers (2)

user3666197
user3666197

Reputation: 1

Q : "Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?"

Well, ZeroMQ does not fail to communicate, the problem is, how Python multiprocessing module "operates".

The module is designed so that some processing may escape from the python central GIL-lock (re-[SERIAL]-iser, that is used as a forever present [CONCURRENT]-situations' principal avoider).

This means that the call to the multiprocessing.Process makes one exact "mirror-copy" of the python interpreter state, "exported" into new O/S-spawned process (details depend on localhost O/S).

Given that, there is zero chance a "mirror"-ed replica could get access to resources already owned by the __main__ - here the .bind()-method already acquired ipc://test address, so "remote"-process will never get "permission" to touch this ZeroMQ AccessPoint, unless the code gets repaired & fully re-factored.

Q : "Can anyone explain on this and provide some advice?"

Sure. The best step to start is to fully understand Pythonic culture of monopolistic GIL-lock re-[SERIAL]-isation, where no two things ever happen in the same time, so even adding more threads does not speed-up the flow of the processing, as it all gets re-aligned by the central "monopolist" The GIL-lock.

Next, understanding a promise of a fully reflected copy of the python interpreter state, while it sounds promising, also has some obvious drawbacks - the new processes, being "mirror"-copies cannot introduce colliding cases on already owned resources. If they try to, a not working as expected cases are the milder of the problems in such principally ill-designed cases.

In your code, the first row in __main__ instantiates a = A(), where A's .__init__ method straight occupies the IPC-resource since .bind('ipc://test'). The later step, p = Process( target = A.run, args = ( sock, ) ) "mirror"-replicates the state of the python interpreter (an as-is copy) and the p.start() cannot but crash into disability to "own" the same resource as the __main__ already owns (yes, the ipc://test for a "mirror"-ed process instructed call to grab the same, non-free resource in .bind('ipc://test') ). This will never fly.

Last but not least, enjoy the Zen-of-Zero, the masterpiece of Martin SUSTRIK for , so well crafted for ultimately scalable, almost zero-latency, very comfortable, widely ported signalling & messaging framework.

Upvotes: 2

SamR
SamR

Reputation: 336

Short answer: Start your subprocesses. Create your zmq.Context- and .Socket-instances from within your Producer.run()-classmethod within each subprocess. Use .bind()-method on the side on which your cardinality is 1, and .connect()-method on the side where your cardinality is >1 (in this case, the "server").

My approach would be structured something like...

# server.py :

    import zmq
    from multiprocessing import Process

    class Producer (Process):
    
      def init(self):
        ...
    
      def run(self):
        ctx = zmq.Context(1)
        sock = zmq.Socket(ctx, zmq.PUB)
        # Multiple producers, so connect instead of bind (consumer must bind)
        sock.connect('ipc://test')
        while True:
          ...
    
    if __name__ == "__main__":
      producer = Producer()
      p = Process(target=producer.run)
      p.start()
      p.join()

# client.py :

    import zmq

    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.SUB)
    # Capture from multiple producers, so bind (producers must connect)
    sock.bind('ipc://test')
    sock.setsockopt_string(zmq.SUBSCRIBE, '') 
    while True:
      print(sock.recv())

Upvotes: 4

Related Questions