Reputation: 1815
I am trying to grasp multithreading/multiprocessing using concurrent futures.
I have tried using the following sets of code. I understand that I will always have the disk IO problem, but I want to maximize my ram and CPU usage to the fullest extent possible.
What method is the most used/best method for large scale processing?
How do you use concurrent futures for processing large datasets?
Is there a more preferred method than the ones below?
Method 1:
for folders in os.path.isdir(path):
p = multiprocessing.Process(pool.apply_async(process_largeFiles(folders)))
jobs.append(p)
p.start()
Method 2:
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
for folders in os.path.isdir(path):
executor.submit(process_largeFiles(folders), 100)
Method 3:
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for folders in os.path.isdir(path):
executor.submit(process_largeFiles(folders), 10)
Should I attempt to use process pool and thread pool together?
Method (thought):
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as process:
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as thread:
for folders in os.path.isdir(path):
process.submit(thread.submit(process_largeFiles(folders), 100),10)
What is the most efficient method to maximize my ram and cpu in the broadest use case?
I am aware that starting processes takes a bit of time, but would it be outweighed with the size of my files being processed?
Upvotes: 3
Views: 3672
Reputation: 23753
Use TreadPoolExecutor to open and read the files then use ProcessPoolExecutor to process the data.
import concurrent.futures
from collections import deque
TPExecutor = concurrent.futures.ThreadPoolExecutor
PPExecutor = concurrent.futures.ProcessPoolExecutor
def get_file(path):
with open(path) as f:
data = f.read()
return data
def process_large_file(s):
return sum(ord(c) for c in s)
files = [filename1, filename2, filename3, filename4, filename5,
filename6, filename7, filename8, filename9, filename0]
results = []
completed_futures = collections.deque()
def callback(future, completed=completed_futures):
completed.append(future)
with TPExecutor(max_workers = 4) as thread_pool_executor:
data_futures = [thread_pool_executor.submit(get_file, path) for path in files]
with PPExecutor() as process_pool_executor:
for data_future in concurrent.futures.as_completed(data_futures):
future = process_pool_executor.submit(process_large_file, data_future.result())
future.add_done_callback(callback)
# collect any that have finished
while completed_futures:
results.append(completed_futures.pop().result())
Used a done callback so it wouldn't have to wait for completed futures. I don't have any idea how that affects efficiency - used it mainly to simplify the logic/code in the as_completed
loop.
If you need to throttle file or data submissions due to memory constraints it would need to be refactored. Depending file read time and processing time it is hard to say how much data will be in memory at any given moment. I think gathering results in the as_completed
should help mitigate that. data_futures
may start completing while the ProcessPoolExecutor gets set up - that sequencing may need to be optimized.
Upvotes: 1