Reputation: 357
This is related to my earlier problem which I'm still working on solving. Essentially I need the inverse design of ProcessPoolExecutor, where I have many querying processes and one worker which calculates and sends back results in batches.
Sending the work items is easy with one shared queue, but I still don't have a nice solution for sending all the results back to the right threads on the right processes.
Upvotes: 0
Views: 130
Reputation: 6440
I think it makes the most sense to have a separate multiprocessing.pipe
for each querying process. The worker process waits for an available item on any pipe, and the dequeues and processes it, keeping track of which pipe it came from. When it's time to send data back, it feeds the results onto the correct pipe.
Here's a simple example:
#!/usr/bin/env python3
import multiprocessing as mp
def worker(pipes):
quit = [False] * len(pipes)
results = [''] * len(pipes)
# Wait for all workers to send None before quitting
while not all(quit):
ready = mp.connection.wait(pipes)
for pipe in ready:
# Get index of query proc's pipe
i = pipes.index(pipe)
# Receive and "process"
obj = pipe.recv()
if obj is None:
quit[i] = True
continue
result = str(obj)
results[i] += result
# Send back to query proc
pipes[i].send(result)
print(results)
def query(pipe):
for i in 'do some work':
pipe.send(i)
assert pipe.recv() == i
pipe.send(None) # Send sentinel
if __name__ == '__main__':
nquery_procs = 8
work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs)))
query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes]
for p in query_procs:
p.start()
worker(work_pipes)
for p in query_procs:
p.join()
Alternatively, you could give each querying process an ID number (which might just be its pipe's index), and any request must be a tuple which is (id_num, data)
. This just gets around the worker process doing pipes.index(pipe)
on each loop, so I'm not sure how much it buys you.
Upvotes: 1