Reputation: 311
How can I get data back from one (provider) Process to another (requester) Process that made a request of it? This needs to work when the requester process may not have been running when the provider process was started.
I know the provider process can have a Queue to receive requests from multiple requesters, but how can it return data to the requester process that put the request on the queue? How can the requester process know when the data is available?
Simplified Example
Process A requests data from Process Z and waits for Z to return it. Process B is started and independently requests data from Process Z. Process Z handles the requests in order, returning one set of data to A and another to B.
Process Z needs to run continually while Processes A, B, C ... etc. may come and go.
N.B. Briefly, this is needed because Process Z is managing requests to an external resource which does not handle concurrent requests.
Thanks for any help and suggestions.
Julian
Upvotes: 0
Views: 722
Reputation: 44108
In the following demo I have chosen to make "Process Z" a daemon process, meaning that it will automatically terminate when all non-daemon, i.e. "regular" processes, terminate. Alterntively, you can make this a regular process and put in its input queue a special sentinel request value such as (None, None)
to signal to it to terminate.
The idea is that Process Z will be initialized with an input queue to which other processes will put
requests. Each request is a tuple. For this demo the first element of the tuple is a value to be squared and the second element is a queue to which the result is to be put
to. So each process that is making a request of Process Z will be passing its own result queue instance. The important thing to note here is that you cannot put
an instance of a multiprocessing.Queue
object to another multiprocessing.Queue
instance: you will get a RuntimeError: Queue objects should only be shared between processes through inheritance. Therefore, these result queues must instead be managed queue instances created by an instance of a multiprocessing.SynchManager
, which is returned by calling multiprocessing.Manager()
:
from multiprocessing import Process, Queue, Manager
def process_z(input_q):
while True:
x, result_q = input_q.get()
result_q.put(x ** 2)
def process_a(input_q, result_q):
input_q.put((3, result_q))
result = result_q.get()
print('3 ** 2 =', result)
def process_b(input_q, result_q):
input_q.put((7, result_q))
result = result_q.get()
print('7 ** 2 =', result)
input_q.put((5, result_q))
result = result_q.get()
print('5 ** 2 =', result)
for x in range(10, 14):
input_q.put((x, result_q))
for x in range(10, 14):
result = result_q.get()
print(f'{x} ** 2 =', result)
# required by Windows:
if __name__ == '__main__':
with Manager() as manager:
input_q = Queue()
# make "Process Z" a daemon process that will end when all non-daemon processes end:
Process(target=process_z, args=(input_q,), daemon=True).start()
result_q_a = manager.Queue()
p_a = Process(target=process_a, args=(input_q, result_q_a))
p_a.start()
result_q_b = manager.Queue()
p_b = Process(target=process_b, args=(input_q, result_q_b))
p_b.start()
# wait for completion of non-daemon processes:
p_a.join()
p_b.join()
Prints:
3 ** 2 = 9
7 ** 2 = 49
5 ** 2 = 25
10 ** 2 = 100
11 ** 2 = 121
12 ** 2 = 144
13 ** 2 = 169
Upvotes: 1