Akababa
Akababa

Reputation: 357

Inverse of ProcessPoolExecutor in Python

This is related to my earlier problem which I'm still working on solving. Essentially I need the inverse design of ProcessPoolExecutor, where I have many querying processes and one worker which calculates and sends back results in batches.

Sending the work items is easy with one shared queue, but I still don't have a nice solution for sending all the results back to the right threads on the right processes.

Upvotes: 0

Views: 130

Answers (1)

bnaecker
bnaecker

Reputation: 6440

I think it makes the most sense to have a separate multiprocessing.pipe for each querying process. The worker process waits for an available item on any pipe, and the dequeues and processes it, keeping track of which pipe it came from. When it's time to send data back, it feeds the results onto the correct pipe.

Here's a simple example:

#!/usr/bin/env python3

import multiprocessing as mp

def worker(pipes):
    quit = [False] * len(pipes)
    results = [''] * len(pipes)

    # Wait for all workers to send None before quitting
    while not all(quit):
        ready = mp.connection.wait(pipes)
        for pipe in ready:

            # Get index of query proc's pipe
            i = pipes.index(pipe)

            # Receive and "process"
            obj = pipe.recv()
            if obj is None:
                quit[i] = True
                continue
            result = str(obj)
            results[i] += result

            # Send back to query proc
            pipes[i].send(result)
    print(results)


def query(pipe):
    for i in 'do some work':
        pipe.send(i)
        assert pipe.recv() == i
    pipe.send(None) # Send sentinel

if __name__ == '__main__':
    nquery_procs = 8
    work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs)))

    query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes]
    for p in query_procs:
        p.start()
    worker(work_pipes)
    for p in query_procs:
        p.join()

Alternatively, you could give each querying process an ID number (which might just be its pipe's index), and any request must be a tuple which is (id_num, data). This just gets around the worker process doing pipes.index(pipe) on each loop, so I'm not sure how much it buys you.

Upvotes: 1

Related Questions