Richard Neumann
Richard Neumann

Reputation: 3381

Multiprocessing program does not terminate

I have a program using multiprocessing.

Here's the relevant core:

"""Multiprocessing worker."""

from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type

from setproctitle import setproctitle

from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger


__all__ = ['BaseWorker', 'multiprocess']


class BaseWorker:
    """Stored args and manager to process systems."""

    __slots__ = ('index', 'systems', 'results', 'running', 'current_system')

    def __init__(self, index: int, systems: Queue, results: Queue):
        """Sets the command line arguments."""
        self.index = index
        self.systems = systems
        self.results = results
        self.running = True
        self.current_system = None

    def __call__(self, args: Namespace) -> None:
        """Runs the worker on the given system."""
        setproctitle(self.name)
        signal(SIGUSR1, self.signal)
        signal(SIGUSR2, self.signal)

        while self.running:
            try:
                self.current_system = system = self.systems.get(timeout=1)
            except Empty:
                self.logger.info('Finished')
                return

            result = self.process_system(system, args)
            self.results.put_nowait((system, result))

        self.logger.info('Aborted')

    @property
    def info(self) -> str:
        """Returns information about the state of the worker."""
        if self.current_system is None:
            return 'idle'

        return f'Processing system #{self.current_system}'

    @property
    def logger(self) -> Logger:
        """Returns the worker's logger."""
        logger = getLogger(self.name)
        logger.setLevel(INFO)
        return logger

    @property
    def name(self) -> str:
        """Returns the worker's name."""
        return f'hidsltools-worker-{self.index}'

    def signal(self, signal_number: int, _: Any) -> None:
        """Handles the given signal."""
        if signal_number == SIGUSR1:
            self.logger.info(self.info)
        elif signal_number == SIGUSR2:
            self.running = False
        else:
            self.logger.error('Received invalid signal: %i', signal_number)

    def process_system(self, system: int, args: Namespace) -> dict:
        """Processes a single system."""
        result = {'start': (start := datetime.now()).isoformat()}

        try:
            result['result'] = self.run(system, args)
        except SSHConnectionError:
            syslogger(system).error('Could not establish SSH connection.')
            result['online'] = False
        else:
            result['online'] = True

        result['end'] = (end := datetime.now()).isoformat()
        result['duration'] = str(end - start)
        return result

    @staticmethod
    def run(system: int, args: Namespace) -> dict:
        """Runs the respective processes."""
        raise NotImplementedError()


def multiprocess(
        worker_cls: Type[BaseWorker],
        systems: list[int],
        processes: int,
        args: Namespace
) -> dict:
    """Spawns workers and waits for them to finish."""

    wait_for_processes(list(spawn_workers(
        worker_cls,
        processes,
        sequence_to_queue(systems),
        results := Queue(),
        args
    )))
    return dict(iter_queue(results))


def sequence_to_queue(sequence: Sequence[Any]) -> Queue:
    """Returns a queue with items from the given sequence."""

    queue = Queue(len(sequence))

    for item in sequence:
        queue.put(item)

    return queue


def spawn_workers(
        worker_cls: Type[BaseWorker],
        amount: int,
        systems: Queue,
        results: Queue,
        args: Namespace
) -> Iterator[Process]:
    """Spawns worker processes."""

    for index in range(amount):
        worker = worker_cls(index, systems, results)
        process = Process(target=worker, args=(args,))
        process.start()
        yield process


def wait_for_processes(processes: list[Process]) -> None:
    """Wait for the given processes."""

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        for process in processes:
            process.kill()

        raise


def iter_queue(queue: Queue) -> Iterator[Any]:
    """Yield queue items."""

    while not queue.empty():
        yield queue.get()

You can find the full program here.

Now there's the issue that the program does not terminate, even after all subprocesses finish, especially, when there were many subprocesses:

$ sysrpc -Rp 8 {201..900}
[ERROR] sysrpc.201: Could not establish SSH connection.
[ERROR] sysrpc.222: Could not establish SSH connection.
[ERROR] sysrpc.208: Could not establish SSH connection.
[ERROR] sysrpc.209: Could not establish SSH connection.
[ERROR] sysrpc.210: Could not establish SSH connection.
[ERROR] sysrpc.217: Could not establish SSH connection.
[ERROR] sysrpc.223: Could not establish SSH connection.
[ERROR] sysrpc.224: Could not establish SSH connection.
[ERROR] sysrpc.225: Could not establish SSH connection.
[ERROR] sysrpc.226: Could not establish SSH connection.
[ERROR] sysrpc.228: Could not establish SSH connection.
[ERROR] sysrpc.229: Could not establish SSH connection.
[ERROR] sysrpc.230: Could not establish SSH connection.
[ERROR] sysrpc.231: Could not establish SSH connection.
[ERROR] sysrpc.232: Could not establish SSH connection.
[ERROR] sysrpc.233: Could not establish SSH connection.
[ERROR] sysrpc.234: Could not establish SSH connection.
[ERROR] sysrpc.247: Could not establish SSH connection.
[ERROR] sysrpc.235: Could not establish SSH connection.
[ERROR] sysrpc.256: Could not establish SSH connection.
[ERROR] sysrpc.258: Could not establish SSH connection.
[ERROR] sysrpc.241: Could not establish SSH connection.
[ERROR] sysrpc.246: Could not establish SSH connection.
[ERROR] sysrpc.278: Could not establish SSH connection.
[ERROR] sysrpc.262: Could not establish SSH connection.
[ERROR] sysrpc.287: Could not establish SSH connection.
[ERROR] sysrpc.276: Could not establish SSH connection.
[ERROR] sysrpc.299: Could not establish SSH connection.
[ERROR] sysrpc.301: Could not establish SSH connection.
[ERROR] sysrpc.263: Could not establish SSH connection.
[ERROR] sysrpc.268: Could not establish SSH connection.
[ERROR] sysrpc.315: Could not establish SSH connection.
[ERROR] sysrpc.274: Could not establish SSH connection.
[ERROR] sysrpc.320: Could not establish SSH connection.
[ERROR] sysrpc.318: Could not establish SSH connection.
[ERROR] sysrpc.302: Could not establish SSH connection.
[ERROR] sysrpc.330: Could not establish SSH connection.
[ERROR] sysrpc.311: Could not establish SSH connection.
[ERROR] sysrpc.312: Could not establish SSH connection.
[ERROR] sysrpc.335: Could not establish SSH connection.
[ERROR] sysrpc.294: Could not establish SSH connection.
[ERROR] sysrpc.358: Could not establish SSH connection.
[ERROR] sysrpc.360: Could not establish SSH connection.
[ERROR] sysrpc.362: Could not establish SSH connection.
[ERROR] sysrpc.308: Could not establish SSH connection.
[ERROR] sysrpc.336: Could not establish SSH connection.
[ERROR] sysrpc.380: Could not establish SSH connection.
[ERROR] sysrpc.386: Could not establish SSH connection.
[ERROR] sysrpc.349: Could not establish SSH connection.
[ERROR] sysrpc.357: Could not establish SSH connection.
[ERROR] sysrpc.417: Could not establish SSH connection.
[ERROR] sysrpc.422: Could not establish SSH connection.
[ERROR] sysrpc.435: Could not establish SSH connection.
[ERROR] sysrpc.408: Could not establish SSH connection.
[ERROR] sysrpc.412: Could not establish SSH connection.
[ERROR] sysrpc.420: Could not establish SSH connection.
[ERROR] sysrpc.436: Could not establish SSH connection.
[ERROR] sysrpc.390: Could not establish SSH connection.
[ERROR] sysrpc.439: Could not establish SSH connection.
[ERROR] sysrpc.440: Could not establish SSH connection.
[ERROR] sysrpc.441: Could not establish SSH connection.
[ERROR] sysrpc.443: Could not establish SSH connection.
[ERROR] sysrpc.446: Could not establish SSH connection.
[ERROR] sysrpc.433: Could not establish SSH connection.
[ERROR] sysrpc.448: Could not establish SSH connection.
[ERROR] sysrpc.449: Could not establish SSH connection.
[ERROR] sysrpc.450: Could not establish SSH connection.
[ERROR] sysrpc.451: Could not establish SSH connection.
[ERROR] sysrpc.452: Could not establish SSH connection.
[ERROR] sysrpc.453: Could not establish SSH connection.
[ERROR] sysrpc.454: Could not establish SSH connection.
[ERROR] sysrpc.455: Could not establish SSH connection.
[ERROR] sysrpc.456: Could not establish SSH connection.
[ERROR] sysrpc.457: Could not establish SSH connection.
[ERROR] sysrpc.464: Could not establish SSH connection.
[ERROR] sysrpc.458: Could not establish SSH connection.
[ERROR] sysrpc.459: Could not establish SSH connection.
[ERROR] sysrpc.460: Could not establish SSH connection.
[ERROR] sysrpc.461: Could not establish SSH connection.
[ERROR] sysrpc.462: Could not establish SSH connection.
[ERROR] sysrpc.463: Could not establish SSH connection.
[ERROR] sysrpc.465: Could not establish SSH connection.
[ERROR] sysrpc.469: Could not establish SSH connection.
[ERROR] sysrpc.471: Could not establish SSH connection.
[ERROR] sysrpc.473: Could not establish SSH connection.
[ERROR] sysrpc.474: Could not establish SSH connection.
[ERROR] sysrpc.479: Could not establish SSH connection.
[ERROR] sysrpc.475: Could not establish SSH connection.
[ERROR] sysrpc.476: Could not establish SSH connection.
[ERROR] sysrpc.477: Could not establish SSH connection.
[ERROR] sysrpc.480: Could not establish SSH connection.
[ERROR] sysrpc.481: Could not establish SSH connection.
[ERROR] sysrpc.482: Could not establish SSH connection.
[ERROR] sysrpc.483: Could not establish SSH connection.
[ERROR] sysrpc.484: Could not establish SSH connection.
[ERROR] sysrpc.489: Could not establish SSH connection.
[ERROR] sysrpc.491: Could not establish SSH connection.
[ERROR] sysrpc.492: Could not establish SSH connection.
[ERROR] sysrpc.497: Could not establish SSH connection.
[ERROR] sysrpc.528: Could not establish SSH connection.
[ERROR] sysrpc.498: Could not establish SSH connection.
[ERROR] sysrpc.532: Could not establish SSH connection.
[ERROR] sysrpc.533: Could not establish SSH connection.
[ERROR] sysrpc.505: Could not establish SSH connection.
[ERROR] sysrpc.541: Could not establish SSH connection.
[ERROR] sysrpc.544: Could not establish SSH connection.
[ERROR] sysrpc.546: Could not establish SSH connection.
[ERROR] sysrpc.511: Could not establish SSH connection.
[ERROR] sysrpc.521: Could not establish SSH connection.
[ERROR] sysrpc.569: Could not establish SSH connection.
[ERROR] sysrpc.573: Could not establish SSH connection.
[ERROR] sysrpc.579: Could not establish SSH connection.
[ERROR] sysrpc.586: Could not establish SSH connection.
[ERROR] sysrpc.543: Could not establish SSH connection.
[ERROR] sysrpc.559: Could not establish SSH connection.
[ERROR] sysrpc.563: Could not establish SSH connection.
[ERROR] sysrpc.572: Could not establish SSH connection.
[ERROR] sysrpc.602: Could not establish SSH connection.
[ERROR] sysrpc.606: Could not establish SSH connection.
[ERROR] sysrpc.581: Could not establish SSH connection.
[ERROR] sysrpc.614: Could not establish SSH connection.
[ERROR] sysrpc.584: Could not establish SSH connection.
[ERROR] sysrpc.616: Could not establish SSH connection.
[ERROR] sysrpc.618: Could not establish SSH connection.
[ERROR] sysrpc.588: Could not establish SSH connection.
[ERROR] sysrpc.632: Could not establish SSH connection.
[ERROR] sysrpc.642: Could not establish SSH connection.
[ERROR] sysrpc.646: Could not establish SSH connection.
[ERROR] sysrpc.626: Could not establish SSH connection.
[ERROR] sysrpc.627: Could not establish SSH connection.
[ERROR] sysrpc.657: Could not establish SSH connection.
[ERROR] sysrpc.647: Could not establish SSH connection.
[ERROR] sysrpc.650: Could not establish SSH connection.
[ERROR] sysrpc.611: Could not establish SSH connection.
[ERROR] sysrpc.623: Could not establish SSH connection.
[ERROR] sysrpc.668: Could not establish SSH connection.
[ERROR] sysrpc.669: Could not establish SSH connection.
[ERROR] sysrpc.670: Could not establish SSH connection.
[ERROR] sysrpc.656: Could not establish SSH connection.
[ERROR] sysrpc.663: Could not establish SSH connection.
[ERROR] sysrpc.687: Could not establish SSH connection.
[ERROR] sysrpc.688: Could not establish SSH connection.
[ERROR] sysrpc.689: Could not establish SSH connection.
[ERROR] sysrpc.690: Could not establish SSH connection.
[ERROR] sysrpc.691: Could not establish SSH connection.
[ERROR] sysrpc.693: Could not establish SSH connection.
[ERROR] sysrpc.702: Could not establish SSH connection.
[ERROR] sysrpc.694: Could not establish SSH connection.
[ERROR] sysrpc.710: Could not establish SSH connection.
[ERROR] sysrpc.715: Could not establish SSH connection.
[ERROR] sysrpc.717: Could not establish SSH connection.
[ERROR] sysrpc.719: Could not establish SSH connection.
[ERROR] sysrpc.696: Could not establish SSH connection.
[ERROR] sysrpc.697: Could not establish SSH connection.
[ERROR] sysrpc.705: Could not establish SSH connection.
[ERROR] sysrpc.742: Could not establish SSH connection.
[ERROR] sysrpc.743: Could not establish SSH connection.
[ERROR] sysrpc.744: Could not establish SSH connection.
[ERROR] sysrpc.747: Could not establish SSH connection.
[ERROR] sysrpc.748: Could not establish SSH connection.
[ERROR] sysrpc.722: Could not establish SSH connection.
[ERROR] sysrpc.695: Could not establish SSH connection.
[ERROR] sysrpc.773: Could not establish SSH connection.
[ERROR] sysrpc.776: Could not establish SSH connection.
[ERROR] sysrpc.783: Could not establish SSH connection.
[ERROR] sysrpc.798: Could not establish SSH connection.
[ERROR] sysrpc.803: Could not establish SSH connection.
[ERROR] sysrpc.812: Could not establish SSH connection.
[ERROR] sysrpc.818: Could not establish SSH connection.
[ERROR] sysrpc.819: Could not establish SSH connection.
[ERROR] sysrpc.822: Could not establish SSH connection.
[ERROR] sysrpc.821: Could not establish SSH connection.
[ERROR] sysrpc.830: Could not establish SSH connection.
[ERROR] sysrpc.831: Could not establish SSH connection.
[ERROR] sysrpc.835: Could not establish SSH connection.
[ERROR] sysrpc.789: Could not establish SSH connection.
[ERROR] sysrpc.790: Could not establish SSH connection.
[ERROR] sysrpc.841: Could not establish SSH connection.
[ERROR] sysrpc.826: Could not establish SSH connection.
[ERROR] sysrpc.842: Could not establish SSH connection.
[ERROR] sysrpc.848: Could not establish SSH connection.
[ERROR] sysrpc.849: Could not establish SSH connection.
[ERROR] sysrpc.850: Could not establish SSH connection.
[ERROR] sysrpc.851: Could not establish SSH connection.
[ERROR] sysrpc.853: Could not establish SSH connection.
[ERROR] sysrpc.854: Could not establish SSH connection.
[ERROR] sysrpc.860: Could not establish SSH connection.
[ERROR] sysrpc.866: Could not establish SSH connection.
[ERROR] sysrpc.867: Could not establish SSH connection.
[ERROR] sysrpc.868: Could not establish SSH connection.
[ERROR] sysrpc.872: Could not establish SSH connection.
[ERROR] sysrpc.856: Could not establish SSH connection.
[ERROR] sysrpc.883: Could not establish SSH connection.
[ERROR] sysrpc.881: Could not establish SSH connection.
[ERROR] sysrpc.877: Could not establish SSH connection.
[ERROR] sysrpc.876: Could not establish SSH connection.
[ERROR] sysrpc.882: Could not establish SSH connection.
[ERROR] sysrpc.887: Could not establish SSH connection.
[ERROR] sysrpc.884: Could not establish SSH connection.
[ERROR] sysrpc.888: Could not establish SSH connection.
[INFO] hidsltools-worker-2: Finished
[ERROR] sysrpc.885: Could not establish SSH connection.
[INFO] hidsltools-worker-0: Finished
[ERROR] sysrpc.896: Could not establish SSH connection.
[ERROR] sysrpc.886: Could not establish SSH connection.
[INFO] hidsltools-worker-5: Finished
[ERROR] sysrpc.897: Could not establish SSH connection.
[INFO] hidsltools-worker-1: Finished
[INFO] hidsltools-worker-4: Finished
[ERROR] sysrpc.898: Could not establish SSH connection.
[ERROR] sysrpc.899: Could not establish SSH connection.
[INFO] hidsltools-worker-7: Finished
[INFO] hidsltools-worker-6: Finished
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished

As you can see, all eight workers finished, but the program did not terminate. If I hit return, this will also not terminate the main process:

...
[ERROR] sysrpc.900: Could not establish SSH connection.
[INFO] hidsltools-worker-3: Finished









Only if I press Ctrl+C, the main process will exit:

[INFO] hidsltools-worker-3: Finished







^CProcess Process-7:
1 ✗ neumann@ThinkCentre ~ $ 

Why does the main process not terminate, even if all subprocesses terminated?

Upvotes: 1

Views: 75

Answers (1)

Booboo
Booboo

Reputation: 44313

If you read the documentation on multiprocessing.Queue, in particular the warnings, you will see:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

Specifically, you are calling wait_for_processes, which attempts to join the child processes, and then you are calling iter_queue, which does calls to get on the queue that the child processes have put items (by the way, the documentation also makes clear that the call to queue.empty() is not reliable for multiprocessing.Queue instances.

You clearly need to reverse the order of operations, i.e. the main process needs to get all the items off the queue before it attempts to join the child processes -- and this should be done without using queue.empty.

One solution is to use instead a multiprocssing.JoinableQueue for the input queue that the child processes are getting from and put all the items to be processed on this queue before creating the child processes that will be getting from this queue. Then after a child processes has retrieved an item form this queue and has put a response to the output queue they then call method task_done on the input queue signaling that processing of that piece of work has been completed. Then the main process only has to call method join_queue on the input queue to ensure that all input tasks have been processed and their responses have been put on the output queue. Then you just have to do calls to get_nowait on the results queue until it throws an Empty exception.

This is the general (however, untested) idea. You may have to add addition code to handle the case when a signal is issued and the child processes do not get to process the entire input queue:

"""Multiprocessing worker."""

from argparse import Namespace
from datetime import datetime
from logging import INFO, Logger, getLogger
from multiprocessing import Process, Queue, JoinableQueue
from queue import Empty
from signal import SIGUSR1, SIGUSR2, signal
from typing import Any, Iterator, Sequence, Type

from setproctitle import setproctitle

from homeinfotools.exceptions import SSHConnectionError
from homeinfotools.logging import syslogger


__all__ = ['BaseWorker', 'multiprocess']


class BaseWorker:
    """Stored args and manager to process systems."""

    __slots__ = ('index', 'systems', 'results', 'running', 'current_system')

    def __init__(self, index: int, systems: JoinableQueue, results: Queue):
        """Sets the command line arguments."""
        self.index = index
        self.systems = systems
        self.results = results
        self.running = True
        self.current_system = None

    def __call__(self, args: Namespace) -> None:
        """Runs the worker on the given system."""
        setproctitle(self.name)
        signal(SIGUSR1, self.signal)
        signal(SIGUSR2, self.signal)

        # We must process all the input even when self.running is False,
        # else main process's call to join_queue will not succeed
        while True:
            try:
                # Items are already on the queue so use get_nowait()
                self.current_system = self.systems.get_nowait()
            except Empty:
                self.logger.info('Finished')
                return

            if self.running:
                # then process the input for real:
                result = self.process_system(self.current_system, args)
                self.results.put_nowait((self.current_system, result))
            # Show we are done with this item and any result has been put:
            self.systems.task_done()

        if not self.running:
            self.logger.info('Aborted')

    @property
    def info(self) -> str:
        """Returns information about the state of the worker."""
        if self.current_system is None:
            return 'idle'

        return f'Processing system #{self.current_system}'

    @property
    def logger(self) -> Logger:
        """Returns the worker's logger."""
        logger = getLogger(self.name)
        logger.setLevel(INFO)
        return logger

    @property
    def name(self) -> str:
        """Returns the worker's name."""
        return f'hidsltools-worker-{self.index}'

    def signal(self, signal_number: int, _: Any) -> None:
        """Handles the given signal."""
        if signal_number == SIGUSR1:
            self.logger.info(self.info)
        elif signal_number == SIGUSR2:
            self.running = False
        else:
            self.logger.error('Received invalid signal: %i', signal_number)

    def process_system(self, system: int, args: Namespace) -> dict:
        """Processes a single system."""
        result = {'start': (start := datetime.now()).isoformat()}

        try:
            result['result'] = self.run(system, args)
        except SSHConnectionError:
            syslogger(system).error('Could not establish SSH connection.')
            result['online'] = False
        else:
            result['online'] = True

        result['end'] = (end := datetime.now()).isoformat()
        result['duration'] = str(end - start)
        return result

    @staticmethod
    def run(system: int, args: Namespace) -> dict:
        """Runs the respective processes."""
        raise NotImplementedError()


def multiprocess(
        worker_cls: Type[BaseWorker],
        systems: list[int],
        processes: int,
        args: Namespace
) -> dict:
    """Spawns workers and waits for them to finish."""

    # First put all items on the input queue:
    systems_queue = sequence_to_queue(systems)
    # Then start the children that will process the input queue
    process_list = list(spawn_workers(
        worker_cls,
        processes,
        systems_queue,
        results := Queue(),
        args
    ))
    # Wait for all work to be completed:
    systems_queue.join()
    # Now iterate the output queue
    d = dict(iter_queue(results))
    # Finally wait for the processes to complete:
    wait_for_processes(process_list)
    return d


def sequence_to_queue(sequence: Sequence[Any]) -> JoinableQueue:
    """Returns a queue with items from the given sequence."""

    queue = JoinableQueue(len(sequence))

    for item in sequence:
        queue.put(item)

    return queue


def spawn_workers(
        worker_cls: Type[BaseWorker],
        amount: int,
        systems: JoinableQueue,
        results: Queue,
        args: Namespace
) -> Iterator[Process]:
    """Spawns worker processes."""

    for index in range(amount):
        worker = worker_cls(index, systems, results)
        process = Process(target=worker, args=(args,))
        process.start()
        yield process


def wait_for_processes(processes: list[Process]) -> None:
    """Wait for the given processes."""

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        for process in processes:
            process.kill()

        raise


def iter_queue(queue: Queue) -> Iterator[Any]:
    """Yield queue items."""

    try:
        while True:
            yield queue.get_nowait()
    except Empty:
        pass

Note

In the above code I have chosen to put all the input items on the input queue prior to creating the child processes. This allows the child processes to execute the get_nowait method on the input queue and when it returns an Empty exception the child process knows that all input has been processed and it can return allowing the eventual join for this process to complete. Alternatively, the main process can put items on the input queue after having started the child processes. But in this case the child process must issue a blocking get call on the queue and can never know if and when there will be any future items placed on the queue. In this case, the child processes will never terminate (unless the main process puts N sentinel items, e.g. None, signifying that these are the final items that will ever be placed on the queue and the child process then terminates after seeing one of these sentinels). Therefore, these child processes will need to be daemon processes that are never joined by the main process.


A second solution is for each child process to put a None item on the results queue (or some value other than None that cannot be confused with an actual response value and can serve as a "no more output will be written by this child process" indicator) after it has detected that there are no more items on the input queue. Then the main process just repeatedly calls get on the results queue until it has detected N None items where N is the number of children processes that are processing items on the input queue.

By the way, it's better that you do not have a local variable named system that happens to be the same name as a standard Python library module.

Upvotes: 3

Related Questions