johnnyb
johnnyb

Reputation: 1815

fastest method of large file processing using concurrent futures python 3.5

I am trying to grasp multithreading/multiprocessing using concurrent futures.

I have tried using the following sets of code. I understand that I will always have the disk IO problem, but I want to maximize my ram and CPU usage to the fullest extent possible.

What method is the most used/best method for large scale processing?

How do you use concurrent futures for processing large datasets?

Is there a more preferred method than the ones below?

Method 1:

for folders in os.path.isdir(path):
    p = multiprocessing.Process(pool.apply_async(process_largeFiles(folders)))
    jobs.append(p)
    p.start()

Method 2:

with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
    for folders in os.path.isdir(path):
        executor.submit(process_largeFiles(folders), 100)

Method 3:

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for folders in os.path.isdir(path):
        executor.submit(process_largeFiles(folders), 10)

Should I attempt to use process pool and thread pool together?

Method (thought):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as process:
     with concurrent.futures.ThreadPoolExecutor(max_workers=100) as thread:
          for folders in os.path.isdir(path):
              process.submit(thread.submit(process_largeFiles(folders), 100),10)

What is the most efficient method to maximize my ram and cpu in the broadest use case?

I am aware that starting processes takes a bit of time, but would it be outweighed with the size of my files being processed?

Upvotes: 3

Views: 3672

Answers (1)

wwii
wwii

Reputation: 23753

Use TreadPoolExecutor to open and read the files then use ProcessPoolExecutor to process the data.

import concurrent.futures
from collections import deque

TPExecutor = concurrent.futures.ThreadPoolExecutor
PPExecutor = concurrent.futures.ProcessPoolExecutor
def get_file(path):
    with open(path) as f:
        data = f.read()
    return data

def process_large_file(s):
    return sum(ord(c) for c in s)

files = [filename1, filename2, filename3, filename4, filename5,
         filename6, filename7, filename8, filename9, filename0]

results = []
completed_futures = collections.deque()

def callback(future, completed=completed_futures):
    completed.append(future)

with TPExecutor(max_workers = 4) as thread_pool_executor:
    data_futures = [thread_pool_executor.submit(get_file, path) for path in files]
with PPExecutor() as process_pool_executor:
    for data_future in concurrent.futures.as_completed(data_futures):
        future = process_pool_executor.submit(process_large_file, data_future.result())
        future.add_done_callback(callback)
        # collect any that have finished
        while completed_futures:
            results.append(completed_futures.pop().result())

Used a done callback so it wouldn't have to wait for completed futures. I don't have any idea how that affects efficiency - used it mainly to simplify the logic/code in the as_completed loop.

If you need to throttle file or data submissions due to memory constraints it would need to be refactored. Depending file read time and processing time it is hard to say how much data will be in memory at any given moment. I think gathering results in the as_completed should help mitigate that. data_futures may start completing while the ProcessPoolExecutor gets set up - that sequencing may need to be optimized.

Upvotes: 1

Related Questions