Reputation: 156
I'm creating a program that processes files sometimes very huge files in the order of GB. It takes a lot of time. At first I tried using ThreadPoolExecutor which relatively improved the speed. For example a ~200 Mb file would take about ~3 minutes running synchronously, with ThreadPoolExecutor, about ~130+ seconds. That's too slow for me. I tried ProcessPoolExecutor and it did wonderful. Same work done in about 12-18 seconds. Well makes sense since the task is eating a lot of cpu. Now the problem comes to visualizing the progress of the task. I used tqdm. With threads everything works wonderfully. I can see the beautiful progress. But when I change to use Processpool, the program crashes.
My code looks like as follows:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
import os
class FileProcessor:
NJOBS = 8 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str | Path):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@classmethod
def _process_chunk(cls, chunk: bytes, pb: tqdm, *somemoreargs):
# processes each byte, updates progressbar afterwards
array = bytearray(chunk)
for i in range(len(array)):
# do sam' with byte at i
time.sleep(0.0001) # for large file comment this
if pb:
pb.update()
return bytes(array) # and some more vals
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while (schunk := chunk[i:i + schk_size]):
yield schunk # and some more info
i += schk_size
progressbar = tqdm(range(self.filesize))
file = self.filepath.open(mode="rb")
executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
# executor = ProcessPoolExecutor(max_workers=os.get_cpu_count())
with progressbar, file, executor:
while (chunk := file.read(FileProcessor.CHK_SIZE)):
futures = [executor.submit(FileProcessor._process_chunk, sc, progressbar) for sc in subchunk(chunk)]
# futures = [executor.submit(FileProcessor._process_chunk, sc, None) for sc in subchunk(chunk)]
for future in as_completed(futures):
# do something with results
res = future.result()
# progressbar.update(len(res)) # uncomment for multiprocessing
# do final stuff
This works well with multi threads. The progressbar fills smoothly. But when I change to multi processes, the program crashes. I am guessing is due to the fact that "processes not sharing memory space".
So, the question is how can I use tqdm to show the progress smoothly whilst using multi processing. For now I am updating the bar after the process ends: in for future in as_completed(futures)
but the progress display is rather ugly with big jumps
Upvotes: 2
Views: 1378
Reputation: 195573
You can spawn one additional process that is responsible for updating the progress bar. To send data to this process you can use Queue()
.
Here is modified example:
import os
import time
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
as_completed)
from multiprocessing import Process, Queue
from pathlib import Path
from tqdm import tqdm
queue = None
def init_queue(q):
global queue
queue = q
class FileProcessor:
NJOBS = 8 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str | Path):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@staticmethod
def _process_chunk(chunk: bytes, *somemoreargs):
for i in chunk:
# do sam' with byte at i
time.sleep(0.0001)
queue.put_nowait(1)
return "xxx"
@staticmethod
def _pb_updater(filesize, queue):
progressbar = tqdm(total=filesize)
while True:
how_much_to_update = queue.get()
progressbar.update(how_much_to_update)
progressbar.refresh()
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while schunk := chunk[i : i + schk_size]:
yield schunk # and some more info
i += schk_size
pb_queue = Queue()
pb_updater = Process(
target=FileProcessor._pb_updater,
args=(self.filesize, pb_queue),
daemon=True,
)
pb_updater.start()
file = self.filepath.open(mode="rb")
executor = ProcessPoolExecutor(
max_workers=FileProcessor.NJOBS,
initializer=init_queue,
initargs=(pb_queue,),
)
with file, executor:
while chunk := file.read(FileProcessor.CHK_SIZE):
futures = [
executor.submit(FileProcessor._process_chunk, sc)
for sc in subchunk(chunk)
]
for future in as_completed(futures):
_ = future.result()
# do final stuff
if __name__ == "__main__":
f = FileProcessor("./big_file.tar")
f.perform()
Prints:
0%| | 98333/12200314880 [00:04<153:34:42, 22066.56it/s]
Upvotes: 0
Reputation: 44283
Since you want to use a ProcessPoolExecutor
instance (you code still shows you using a ThreadPoolExecutor
instance), then the main process which has nothing else to except wait for submitted tasks to complete can easily be the updater of the progress bar. You now need to arrange for your worker function/method (in your case _process_chunk
to return an additional value that is the amount to advance the progress bar by:
import os
import time
from concurrent.futures import (ProcessPoolExecutor, as_completed)
from pathlib import Path
from tqdm import tqdm
class FileProcessor:
NJOBS = 4 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@staticmethod
def _process_chunk(chunk: bytes, *somemoreargs):
array = bytearray(chunk)
for b in array:
#time.sleep(0.0001)
...
# Also return the number of bytes processed:
return bytes(array), len(array)
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while schunk := chunk[i : i + schk_size]:
yield schunk # and some more info
i += schk_size
progressbar = tqdm(total=self.filesize)
file = self.filepath.open(mode="rb")
executor = ProcessPoolExecutor(max_workers=FileProcessor.NJOBS)
with progressbar, file, executor:
while chunk := file.read(FileProcessor.CHK_SIZE):
futures = [
executor.submit(FileProcessor._process_chunk, sc)
for sc in subchunk(chunk)
]
for future in as_completed(futures):
_, bytes_processed = future.result()
progressbar.update(bytes_processed)
future.result()
# do final stuff
if __name__ == "__main__":
f = FileProcessor("some_big_file.tar")
f.perform()
Upvotes: 2