imap_unordered, but with a lazy flattened generator

I've got a problem that can already be solved by multiprocessing.Pool but the solution is not very optimal. Namely, what I have is have a rather small set of inputs each of which maps to a large dataset. While I can use imap_unordered with a function returning a list, this is far from efficient, because each of the large datasets must be returned as a list.

My function could return them as a generator for lower latency instead, but I cannot return a generator from a subprocess.

A dummy example:

import time
import multiprocessing


def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def wrapper(x):
    return list(generate(x))


with multiprocessing.Pool(10) as pool:
    for ready in pool.imap_unordered(wrapper, range(0, 100, 10)):
        for item in set(ready):  # to show that order does not matter:
            print(item)

The problem is that while the entire run now takes only 10th of the time of running sequentially, I still need to wait 10 seconds for the very first result, which could be available right away by:

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

for ready in map(generate, range(0, 100, 10):
    for item in set(ready):
        print(item)

Which will print first item without delay, but takes 100 seconds to run.

What I cannot do is to subdivide the problem further, the generators in the subprocesses need to be evaluated lazily by the consumer:

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


with multiprocessing.Pool(10) as pool:
    for item in pool.??flatmap_unordered??(generate, range(0, 100, 10)):
        print(item)

which would print the first item right away, yet takes only ~10 seconds to run!

How could I achieve that?

Upvotes: 7

Views: 1339

Answers (6)

Darkonaut
Darkonaut

Reputation: 21684

I dislike the idea of bypassing Pool's internal queues by overlaying it with external queues. IMO it leads to much more moving parts and unnecessary complexity and you easily end up creating hard to detect race-conditions. Pool alone is fairly complex under the hood and bloating the amount of code that runs, by even more piping on top is something I'd rather seek to avoid (KISS). It's using Pool only for it's side-effect of managing worker processes and if at all, I'd only consider it for one-off code, not for a system build for stability or possibly evolving needs.

To give you some comparison for the complexity argument...Pool employs three worker-threads only to manage workers and funnel data back and forth. Together with the main-thread this makes four threads in the parent-process. The Non-Pool version provided below on the other hand is bi-threaded (multiprocessing.Queue starts a feeder-thread upon first usage of .put()) . The ~60 lines of the non-Pool solution compare to ~900 lines of multiprocessing.pool.py alone. A good portion of the latter will run anyway, only to shuffel around Nones instead of your actual results.

Pool is great for the frequent use case of processing function-tasks as a whole, retrieving sub-task results from generators just does not fit the bill here.


Using multiprocessing.Pool

Now if you are determined to go ahead with this approach anyway, at least don't use Manager for it. There's hardly any reason to reach for Manager on a single node and the only need I can think of you really need to use manager-queues for, is when you have to send a queue-reference to an already up and running process. Since Pool's initializer() allows you to pass arguments already at worker-process-startup, manager-queues also are not necessary.

You have to be aware, that every interaction between parent- and child-process through Manager-proxies results in a detour through an extra manager-process, increasing latency by more IPC, context-switches and cache-flushes. It also has the potential to result in a considerably increased memory-footprint.

import time
import multiprocessing as mp

# def generate(x): ... # take from question

def init_queue(queue):
    globals()['queue'] = queue


def wrapper(x):
    q = queue
    for item in generate(x):
        q.put(item)


if __name__ == '__main__':


    POISON = 'POISON'

    queue = mp.SimpleQueue()

    with mp.Pool(processes=4, initializer=init_queue, initargs=(queue,)) as pool:
        pool.map_async(
            func=wrapper,
            iterable=range(0, 100, 10),
            chunksize=1,
            callback=lambda _: queue.put(POISON)
        )
        for res in iter(queue.get, POISON):
            print(res)

Using multiprocessing.Process

Now the alternative I would prefer over using Pool in this case, is building your own little specialized pool, with multiprocessing.Process and some multiprocessing-queue. Yes, it's a bit more code to write, but the amount of code that actually runs is considerably reduced compared to any solution involving multiprocessing.Pool. This leaves less room for subtle bugs, plus it's more open to changing conditions and uses less system resources.

import time
import multiprocessing as mp
from itertools import chain

DONE = 'DONE'
POISON = 'POISON'


def _worker(func, inqueue, outqueue):
    for chunk in iter(inqueue.get, POISON):
        for res in func(chunk):
            outqueue.put(res)
        outqueue.put(DONE)
    outqueue.put(POISON)


def _init_pool(n_workers, func):
    """Initialize worker-processes and queues."""
    inqueue, outqueue = mp.Queue(), mp.Queue()

    pool = [
        mp.Process(target=_worker, args=(func, inqueue, outqueue))
        for _ in range(n_workers)
    ]
    for p in pool:
        p.start()

    return pool, inqueue, outqueue


def iflatmap(n_workers, func, iterable):
    """Yield results from subprocesses unordered and immediately."""
    iterable = chain(iterable, [POISON] * n_workers)
    pool, inqueue, outqueue = _init_pool(n_workers, func)

    for _ in pool:
        inqueue.put(next(iterable))

    while n_workers:
        res = outqueue.get()
        if res == DONE:  # there's a free worker now
            inqueue.put(next(iterable))
        elif res == POISON:  # a worker has shut down
            n_workers -= 1
        else:
            yield res

    for p in pool:
        p.join()

The example here is using only four workers to show this solution (also) doesn't rely on having the same number of workers and tasks. Since it also doesn't need to know the length of the input-iterable for its control-flow, this allows for providing a generator of unknown length as iterable. If you prefer it classy, you can wrap the logic above in a "Pool"-class instead.

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


if __name__ == '__main__':

    for res in iflatmap(n_workers=4, func=generate, iterable=range(0, 100, 10)):
        print(res)

For people unfamiliar with the usage of iter(object, sentinel): docs and some reasoning here

Upvotes: 2

Paul Cornelius
Paul Cornelius

Reputation: 10946

As the OP pointed out, objects passed between Processes must be pickleable. The stated goal is to process data objects as soon as they become available. Therefore the secondary Processes must pass the objects back to the main Process as soon as possible. This establishes constraints on which tools can be used.

The secondary Processes can use a multiprocessing.Queue to transmit the individual objects back to the main Process. The worker functions in the secondary Processes do not need to return anything. Some of the machinery in the standard library methods is designed to collect and manage returned values; it will not be useful. However, it is still necessary to initialize the secondary Processes and submit tasks to them. It is also handy to manage the program flow using the standard library context managers.

The methods of multiprocessing.Pool will work, but I prefer a solution using ProcessPoolExecutor. My impl1() below uses Pool, and impl2() uses the Executor. Both solutions are short and, I think, easy to understand and generalize. In both cases, a multiprocessing.Queue object must be initialized and passed to the secondary Process in its initialization function.

Since the order in which the data is returned to the main Process is not deterministic, it is necessary to have some way of sorting out which data came from which task. I use a simple integer indexing scheme and collect all the results into a single dictionary. This is managed in a secondary thread, while the main thread waits for all the submitted tasks to finish (the ProcessPoolExecutor context will not exit until all the submitted tasks are done). When that happens, I post a sentinel object to the queue to shut it down. I join the secondary thread to wait for the all the objects to be processed. At this point the dictionary containing the collected data items is complete.

The code is multithreaded but the structure is pretty simple. The functions of the main Process are well separated into setup, submission of tasks, and data handling. There's no worry about race conditions.

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import threading
from collections import defaultdict

# Process code
_q = None
def initialize(q):
    global _q  # pylint: disable=global-statement
    _q = q

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def wrapper(x):
    for j in generate(x):
        _q.put((x, j))

# collection thread
def collect(q, results):
    for a, b in iter(q.get, (None, None)):
        print(a, b)
        results[a].append(b)
        
def impl1(q):
    with multiprocessing.Pool(10, initializer=initialize,
                              initargs=(q,)) as pool:
        for _ in pool.imap_unordered(wrapper, range(0, 100, 10)):
            pass
        
def impl2(q):
    with ProcessPoolExecutor(10, initializer=initialize, initargs=(q,)) as ex:
        for n in range(0, 100, 10):
            ex.submit(wrapper, n)
        
def main():
    results = defaultdict(list)
    q = multiprocessing.Queue()
    thr = threading.Thread(target=collect, args=(q, results))
    thr.start()

    # impl1(q)
    impl2(q)
    q.put((None, None))
    thr.join()
    print(results)

if __name__ == "__main__":
    main()

Upvotes: 1

Mortz
Mortz

Reputation: 4939

You could probably use multiprocessing.pipes and send the result over the connections to get the first item(s) immediately -

import time
import multiprocessing

def generate_and_send(x, conn):
    for j in range(x, x + 10):
        conn.send(j)
        time.sleep(1)
    conn.send("POISON")

print(f'program begins at {time.ctime()}')
pipes = [multiprocessing.Pipe() for _ in range(10)]
child_conns = [y for (x, y) in pipes]
parent_conns = [x for (x, y) in pipes]
processes = [multiprocessing.Process(target=generate_and_send, args=(*el,)) for
        el in zip(range(0, 100, 10), child_conns)]

for p in processes:
    p.start()

first_printed = False
while True:
    if parent_conns:
        for par_conn in parent_conns:
            recvd_val = par_conn.recv()
            if recvd_val == "POISON":
                par_conn.close()
                parent_conns.remove(par_conn)
            else:
                if not first_printed:
                    print(f'first item printed at {time.ctime()}')
                    first_printed = True
                print(recvd_val)
    else:
        break
print(f'program ends at {time.ctime()}')

Upvotes: 0

MisterMiyagi
MisterMiyagi

Reputation: 52079

There seems to be no builtin way for a Pool to incrementally collect generated items. However, it is reasonably straightforward to write your own "flat map" helper.

The general idea is to have a wrapper in the pool processes that runs the iterator and pushes each individual item to a queue. In the main process, there just is a plain loop that gets and yields each item.

import functools
import multiprocessing


def flatmap(pool: multiprocessing.Pool, func, iterable, chunksize=None):
    """A flattening, unordered equivalent of Pool.map()"""
    # use a queue to stream individual results from processes
    queue = multiprocessing.Manager().Queue()
    # reuse task management and mapping of Pool
    pool.map_async(
        functools.partial(_flat_mappper, queue, func),
        iterable,
        chunksize,
        # callback: push a signal that everything is done
        lambda _: queue.put(None),
        lambda err: queue.put((None, err))
    )
    # yield each result as it becomes available
    while True:
        item = queue.get()
        if item is None:
            break
        result, err = item
        if err is None:
            yield result
        else:
            raise err


def _flat_mappper(queue: multiprocessing.Queue, func, *args):
    """Helper to run `(*args) -> iter` and stream results to a queue"""
    for item in func(*args):
        queue.put((item, None))

If desired, one could patch the Pool type itself to have flatmap as a method instead of a function.


The flatmap helper can be directly used to accumulate results across the generators. For the example case, it finishes in a bit more than 10 seconds.

import time

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


if __name__ == "__main__":
    with multiprocessing.Pool(10) as pool:
        for item in flatmap(pool, generate, range(0, 100, 10)):
            print(item)

Upvotes: 2

Booboo
Booboo

Reputation: 44283

This is one instance where I believe the simplest approach would be to implement your own multiprocessing pool from daemon processes and multiprocessing.Queue instances (which are more performant than the managed queues returned by calls to multiprocessing.Manager().Queue():

import time
import multiprocessing


# We just need something distinct from a value
# generated by generate. In this case nothing fancy is required:
SENTINEL = None

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def worker(in_q, out_q):
    while True:
        x = in_q.get()
        for j in generate(x):
            out_q.put(j)
        # put sentinel to queue to show task is complete:
        out_q.put(SENTINEL)


# In case we are running under Windows:
if __name__ == '__main__':
    POOL_SIZE = 10
    in_q = multiprocessing.Queue()
    out_q = multiprocessing.Queue()
    for _ in range(POOL_SIZE):
        multiprocessing.Process(target=worker, args=(in_q, out_q), daemon=True).start()
    t = time.time()
    # Submit the 10 tasks:
    for x in range(0, 100, 10):
        in_q.put(x)
    # We have submitted 10 tasks, so when we have seen 10
    # sentinels, we know we have processed all the results
    sentinels_seen = 0
    results = []
    while sentinels_seen < 10:
        return_value = out_q.get()
        if return_value is SENTINEL:
            sentinels_seen += 1
        else:
            # Process return value:
            results.append(return_value)
            if len(results) == 1:
                # First result:
                print('Elapsed time to first result:', time.time() - t)
    print('Total elapsed time:', time.time() - t)

Prints:

Elapsed time to first result: 0.004938602447509766
Total elapsed time: 10.025248765945435

Upvotes: 2

jsbueno
jsbueno

Reputation: 110591

MisterMiyagi's answer may be stalling on linux due to the way "Queue" is serialized when inside a "partial" object. (I can't figure out why).

This is the same code, with a hack to pass the queue around to the mapper on the parameter side, and not embedded in the callable object.

Brain feeling in slow motion today, sorry I can't figure out exactly what is taking place - anyway, this variant of the code worked here:

import functools
import multiprocessing
import secrets
from itertools import repeat
import time


def flatmap(pool: multiprocessing.Pool, func, iterable, chunksize=None):
    """A flattening, unordered equivalent of Pool.map()"""
    # use a queue to stream individual results from processes
    queue = multiprocessing.Manager().Queue()
    sentinel = secrets.token_bytes()
    # reuse task management and mapping of Pool
    pool.map_async(
        functools.partial(_flat_mappper, func),
        zip(iterable,repeat(queue)),
        chunksize,
        # callback: push a signal that everything is done
        lambda _: queue.put(sentinel),
        lambda *e: print("argh!", e),
    )
    # yield each result as it becomes available
    while True:
        item = queue.get()
        if item == sentinel:
            break
        yield item


def _flat_mappper(func, arg):
    """Helper to run `(*args) -> iter` and stream results to a queue"""
    data, queue = arg
    for item in func(data):
        queue.put(item)

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


if __name__ == "__main__":
    with multiprocessing.Pool(10) as pool:
        for item in flatmap(pool, generate, range(0, 100, 10)):
            print(item)

Upvotes: 1

Related Questions