Reputation: 1578
I have a multiprocessing system, with a main process and two child ones.
One of the child processes (say, C1) sends messages to the other child (C2) through a Queue. C2 analyzes the messages from C1 and, when certain conditions occur, it requires some input from the user.
So, the situation looks like this:
main.py
from child_process_01 import Child_Process_01
from child_process_02 import Child_Process_02
set_start_method("spawn")
c1_c2_q = Queue(maxsize=1)
c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
c1.daemon = True
c1.start()
c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q,))
c2.daemon = True
c2.start()
Child_Process_01
message = produced somehow
c1_c2_q.put((message))
Child_Process_02
## infinite loop, keep checking messages and ask for user input under certain conditions
while True:
try:
message = c1_c2_q.get_nowait()
except:
message = None
## when message arrives
if message:
if message = something:
usr_input = input('In this situation, I need user input')
The code doesn't work as it is, since the multiprocessing module closes the standard input of all processes it creates, as I found out in many answers here. A good suggestion seemed to redefine the standard in in the main process and to send it to the child, like explained here or here, so I tried it:
main.py
newstdin = os.fdopen(os.dup(sys.stdin.fileno()))
c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q, newstdin))
c2.daemon = True
c2.start()
Child_Process_02
def Child_Process_02(c1_c2_q, newstdin):
sys.stdin = newstdin
while True:
try:
message = c1_c2_q.get_nowait()
except:
message = None
## when message arrives
if message:
if message = something:
usr_input = input('In this situation, I need user input')
but this doesn't work either, because I can't pass the newstdin object created in the main process through a queue, I get the error:
TypeError: cannot pickle '_io.TextIOWrapper' object
Moreover, a few comments to similar question discouraged the practice of reading input from child processes in general, but I can't imagine how to do it differently. Any suggestions?
Upvotes: 0
Views: 831
Reputation: 44118
First of all you should modify Child_Process_02 so as to not burn so many CPU cycles doing unproductive calls to c1_c2_q.get_nowait()
by using the blocking call c1_c2_q.get()
, which will not return until a message is received (and you can get rid of the try/except
scaffolding around this call).
Second, and this is optional, since messages you are passing from Child_Process_01 to Child_Process_02 have one producer and one consumer, you can gain efficiency by replacing multiprocessing.Queue
with a call to multiprocessing.Pipe(duplex=False)
, which will return two unidirectional multiprocessing.connection.Connection
instances the second of which is suitable for sending arbitrary objects such as strings and the first of which for receiving objects (by the way the multiprocessing.Queue
is implemented on top of the Pipe
). But a full-duplex Pipe
where each connection can be used for both sending and receiving is definitely what you want for the following program where a thread started by the main process is "listening" for requests to do input
calls by calling multiprocessing.connection.Connection.recv()
in a loop. The message returned by this call is the prompt string to be used for the call to input
. The value returned by the input
function will then be sent
back on the connection:
from multiprocessing import Process, Pipe
from threading import Thread
def inputter(input_conn):
""" get requests to do input calls """
while True:
input_msg = input_conn.recv()
value = input(input_msg)
input_conn.send(value) # send inputted value:
def worker(msg_conn, input_conn):
while True:
message = msg_conn.recv()
if message is None:
break
if message == 'do input':
# send inputter our prompt message:
input_conn.send('Enter x: ')
# get back the result of the input:
x = (int)(input_conn.recv())
print('The value entered was', x)
else:
print('Got message:', message)
if __name__ == '__main__':
import time
# create the connections for sending messages from one process to another:
recv_conn, send_conn = Pipe(duplex=False)
# create the connections for doing the input requests:
input_conn1, input_conn2 = Pipe(duplex=True) # each connection is bi-drectional
# start the inputter thread with one of the inputter duplex connections:
t = Thread(target=inputter, args=(input_conn1,), daemon=True)
t.start()
# start a child process with the message connection in lieu of a Queue
# and the other inputter connection:
p = Process(target=worker, args=(recv_conn, input_conn2))
p.start()
# send messages to worker process:
send_conn.send('a')
send_conn.send('do input')
send_conn.send('b')
# signal the child process to terminate:
send_conn.send(None)
p.join()
Prints:
Got message: a
Enter x: 8
The value entered was 8
Got message: b
NOTE
It should be noted that a multiprocessing.Queue
starts a feeder thread for the underlying multiprocessing.Pipe
to prevent the "putter" from prematurely blocking before maxsize calls to put
have been made where maxsize is the value used to instantiate the Queue. But this is also why the multiprocessing.Queue
does not perform as well as the multiprocessing.Pipe
. But this also means that calling send
repeatedly on a connection without corresponding recv
calls on the other end will eventually block. But given that you have specified maxsize=1 on your queue, this is hardly an issue as you would be blocking under the same circumstances with your queue.
Upvotes: 1