Reputation: 366
I have a program written in Python, which kicks off several C++ binaries as subprocesses. I am using ZMQ to communicate between the two. The python class is broken up such Thread-A pushes data into the ZMQ socket, and then Thread-B pulls data from a different ZMQ socket. So the block diagram of the system can be viewed as:
Thread-A ---port1---> C++ ---port2---> Thread-B
Thread-A and Thread-B are started from the same Python class, and the C++ is a binary w/ is kicked off by the same python class which started Thread's A & B, using subprocess.Popen(...)
-- and everything works great. When I try to convert the Thread's A & B to Processes ... stuff breaks. When the ZMQ sock.send()
is called, it says "resource temporarily unavailable." In order to force this exception, I have to put sock.send(..., zmq.NOBLOCK)
, if the zmq.NOBLOCK
is not there then the send function just hangs. Can someone help me explain this? Like I said, when ThreadA and B are threads, then everything works but when I convert them to processes, everything hangs.
Since I have a lot of moving parts in this code ..., I tried to put together a test program which showcases a similar issue I'm having when I send data. Again, stuff works when I have the "thread" version of the code running, but when I do the process version stuff breaks (hint, you can switch between the "threaded" and "processed" versions by commenting/uncommenting the appropriate lines in the code below). I'm using ZMQ 3.2
#!/usr/bin/python
# a simple program to figure out what is going on between threading and
# process for ZMQ
import zmq
import threading
import time
import multiprocessing
class MainClass:
def __init__(self):
port = 11112
s = SenderClass(port)
r = ReceiverClass(port)
r.start()
s.start()
class SenderClass(threading.Thread):
# class SenderClass(multiprocessing.Process):
def __init__(self, port):
self.zmqContext = zmq.Context()
self.pushSocket = self.zmqContext.socket(zmq.PUSH)
self.pushSocket.bind('tcp://*:' + str(port))
threading.Thread.__init__(self)
# multiprocessing.Process.__init__(self)
print 'sender init'
def run(self):
print 'sender running'
for ii in range(0,10):
print 'sent ' + str(ii)
self.pushSocket.send(str(ii))
time.sleep(1)
self.pushSocket.send('KILL')
class ReceiverClass(threading.Thread):
# class ReceiverClass(multiprocessing.Process):
def __init__(self, port):
self.zmqContext = zmq.Context()
self.pullSocket = self.zmqContext.socket(zmq.PULL)
self.pullSocket.connect('tcp://localhost:' + str(port))
threading.Thread.__init__(self)
# multiprocessing.Process.__init__(self)
print 'rx init'
def run(self):
print 'rx running'
while True:
rx = self.pullSocket.recv()
if rx=='KILL':
break;
print 'received = ' + rx
if __name__=='__main__':
M = MainClass()
Upvotes: 2
Views: 1706
Reputation: 8502
You should first start the bind()
-ing socket, and then connect()
...
UPDATE:
Also, when started a new threading.Thread - the context can be created into the parent thread and passed to the child, but when using multiprocessing.Process, the zmq.Context should be created in the child process, otherwise zmq fires assert error that a context is being accessed from a different process.
Upvotes: 2