Reputation: 318
I have the following files:
binreader/
├─ packet/
│ ├─ __init__.py
│ ├─ aggregator.py
│ ├─ parser.py
│ ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py
Code that reproduces the error:
/packet/__init__.py
<empty>
/packet/aggergator.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class AggregatorProcess(Process):
def __init__(self, q_in: Queue, q_out: Queue):
super(AggregatorProcess, self).__init__()
self.q_in = q_in
self.q_out = q_out
def run(self):
while x := self.q_in.get():
log.debug(f"Aggregator: {x}")
self.q_out.put(x)
log.debug("Aggregator: Done")
self.q_out.put(None)
/packet/parser.py
from multiprocessing import Queue, Process
import logging
from typing import List
log = logging.getLogger()
class ParserProcess(Process):
"""Threaded version of parser class"""
def __init__(self, data: List, q_out: Queue):
super(ParserProcess, self).__init__()
self.q_out = q_out
self.data = data
def run(self):
for x in self.data:
log.debug(f"Parser: {x}")
self.q_out.put(x)
log.debug("Parser: Done")
self.q_out.put(None)
/packet/uploader.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class UploaderProcess(Process):
def __init__(self, q_in: Queue) -> None:
super(UploaderProcess, self).__init__()
self.q_in = q_in
def run(self):
while x := self.q_in.get():
log.debug(f"Uploader: {x}")
log.debug("Uploader: Done")
/__init__.py
import sys
import click
import logging
from binreader import upload_concurrent
@click.group()
def cli():
logging.basicConfig(
format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
level=logging.DEBUG,
handlers=[
logging.StreamHandler(sys.stdout),
],
)
cli.add_command(upload_concurrent.upload_data_concurrent)
cli()
/__main__.py
<empty>
/upload_concurrent.py
from multiprocessing import Queue
import click
from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess
log = logging.getLogger()
@click.command(name="upload-concurrent")
def upload_data_concurrent():
parser_agg_wq = Queue()
agg_upl_wq = Queue()
parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
parser.name = type(parser).__name__
aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
aggregator.name = type(aggregator).__name__
uploader = UploaderProcess(agg_upl_wq)
uploader.name = type(uploader).__name__
parser.start()
aggregator.start()
uploader.start()
parser.join()
aggregator.join()
uploader.join()
I have synchronous code that completes the processing, however it is way too slow at ~1 hr/GB. There is about 1.5TB of data that needs processed every two weeks.
When introducing multiprocessing I am getting the following error once per call to Process.start:
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
This program is run as a module: python -m binreader upload-concurrent
I have read this question, however I am not sure where to add the if __name__ == '__main__':
guard. This may not be a viable solution given this is using the click module and I'm unsure what effect that has on how the module starts/runs.
Any guidance is greatly appreciated
Upvotes: 0
Views: 472
Reputation: 348
if __name__ == '__main__':
condition is true if the program is called directly rather than from another module. It needs to go in the main scope of the code. main module
as it is termed in the error info, although correct, seems a little confusing to me.
The condition would go where the first call in the program would be.
Upvotes: 1