Julian7
Julian7

Reputation: 311

How to return data from a Python process to the process that requested it

How can I get data back from one (provider) Process to another (requester) Process that made a request of it? This needs to work when the requester process may not have been running when the provider process was started.

I know the provider process can have a Queue to receive requests from multiple requesters, but how can it return data to the requester process that put the request on the queue? How can the requester process know when the data is available?

Simplified Example

Process A requests data from Process Z and waits for Z to return it. Process B is started and independently requests data from Process Z. Process Z handles the requests in order, returning one set of data to A and another to B.

Process Z needs to run continually while Processes A, B, C ... etc. may come and go.

N.B. Briefly, this is needed because Process Z is managing requests to an external resource which does not handle concurrent requests.

Thanks for any help and suggestions.

Julian

Upvotes: 0

Views: 722

Answers (1)

Booboo
Booboo

Reputation: 44108

In the following demo I have chosen to make "Process Z" a daemon process, meaning that it will automatically terminate when all non-daemon, i.e. "regular" processes, terminate. Alterntively, you can make this a regular process and put in its input queue a special sentinel request value such as (None, None) to signal to it to terminate.

The idea is that Process Z will be initialized with an input queue to which other processes will put requests. Each request is a tuple. For this demo the first element of the tuple is a value to be squared and the second element is a queue to which the result is to be put to. So each process that is making a request of Process Z will be passing its own result queue instance. The important thing to note here is that you cannot put an instance of a multiprocessing.Queue object to another multiprocessing.Queue instance: you will get a RuntimeError: Queue objects should only be shared between processes through inheritance. Therefore, these result queues must instead be managed queue instances created by an instance of a multiprocessing.SynchManager, which is returned by calling multiprocessing.Manager():

from multiprocessing import Process, Queue, Manager

def process_z(input_q):
    while True:
        x, result_q = input_q.get()
        result_q.put(x ** 2)

def process_a(input_q, result_q):
    input_q.put((3, result_q))
    result = result_q.get()
    print('3 ** 2 =', result)

def process_b(input_q, result_q):
    input_q.put((7, result_q))
    result = result_q.get()
    print('7 ** 2 =', result)

    input_q.put((5, result_q))
    result = result_q.get()
    print('5 ** 2 =', result)

    for x in range(10, 14):
        input_q.put((x, result_q))
    for x in range(10, 14):
        result = result_q.get()
        print(f'{x} ** 2 =', result)


# required by Windows:
if __name__ == '__main__':
    with Manager() as manager:
        input_q = Queue()
        # make "Process Z" a daemon process that will end when all non-daemon processes end:
        Process(target=process_z, args=(input_q,), daemon=True).start()

        result_q_a = manager.Queue()
        p_a = Process(target=process_a, args=(input_q, result_q_a))
        p_a.start()

        result_q_b = manager.Queue()
        p_b = Process(target=process_b, args=(input_q, result_q_b))
        p_b.start()

        # wait for completion of non-daemon processes:
        p_a.join()
        p_b.join()

Prints:

3 ** 2 = 9
7 ** 2 = 49
5 ** 2 = 25
10 ** 2 = 100
11 ** 2 = 121
12 ** 2 = 144
13 ** 2 = 169

Upvotes: 1

Related Questions