moonman4
moonman4

Reputation: 318

Python: multiprocessing with click

I have the following files:

binreader/
├─ packet/
│  ├─ __init__.py
│  ├─ aggregator.py
│  ├─ parser.py
│  ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py

Code that reproduces the error:

/packet/__init__.py

<empty>

/packet/aggergator.py

from multiprocessing import Queue, Process
import logging


log = logging.getLogger()


class AggregatorProcess(Process):
    def __init__(self, q_in: Queue, q_out: Queue):
        super(AggregatorProcess, self).__init__()
        self.q_in = q_in
        self.q_out = q_out

    def run(self):
        while x := self.q_in.get():
            log.debug(f"Aggregator: {x}")
            self.q_out.put(x)

        log.debug("Aggregator: Done")
        self.q_out.put(None)

/packet/parser.py

from multiprocessing import Queue, Process
import logging
from typing import List

log = logging.getLogger()


class ParserProcess(Process):
    """Threaded version of parser class"""

    def __init__(self, data: List, q_out: Queue):
        super(ParserProcess, self).__init__()
        self.q_out = q_out
        self.data = data

    def run(self):

        for x in self.data:
            log.debug(f"Parser: {x}")
            self.q_out.put(x)

        log.debug("Parser: Done")
        self.q_out.put(None)

/packet/uploader.py

from multiprocessing import Queue, Process
import logging

log = logging.getLogger()


class UploaderProcess(Process):
    def __init__(self, q_in: Queue) -> None:
        super(UploaderProcess, self).__init__()
        self.q_in = q_in

    def run(self):
        while x := self.q_in.get():
            log.debug(f"Uploader: {x}")

        log.debug("Uploader: Done")

/__init__.py

import sys
import click
import logging

from binreader import upload_concurrent


@click.group()
def cli():
    logging.basicConfig(
        format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
        level=logging.DEBUG,
        handlers=[
            logging.StreamHandler(sys.stdout),
        ],
    )


cli.add_command(upload_concurrent.upload_data_concurrent)

cli()

/__main__.py

<empty>

/upload_concurrent.py

from multiprocessing import Queue
import click

from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess

log = logging.getLogger()


@click.command(name="upload-concurrent")
def upload_data_concurrent():

    parser_agg_wq = Queue()
    agg_upl_wq = Queue()

    parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
    parser.name = type(parser).__name__

    aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
    aggregator.name = type(aggregator).__name__

    uploader = UploaderProcess(agg_upl_wq)
    uploader.name = type(uploader).__name__

    parser.start()
    aggregator.start()
    uploader.start()

    parser.join()
    aggregator.join()
    uploader.join()

I have synchronous code that completes the processing, however it is way too slow at ~1 hr/GB. There is about 1.5TB of data that needs processed every two weeks.

When introducing multiprocessing I am getting the following error once per call to Process.start:

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

This program is run as a module: python -m binreader upload-concurrent

I have read this question, however I am not sure where to add the if __name__ == '__main__': guard. This may not be a viable solution given this is using the click module and I'm unsure what effect that has on how the module starts/runs.

Any guidance is greatly appreciated

Upvotes: 0

Views: 472

Answers (1)

InhirCode
InhirCode

Reputation: 348

if __name__ == '__main__': condition is true if the program is called directly rather than from another module. It needs to go in the main scope of the code. main module as it is termed in the error info, although correct, seems a little confusing to me.

The condition would go where the first call in the program would be.

Upvotes: 1

Related Questions