AdamF
AdamF

Reputation: 2930

How to parallelise python script for processing 10,000 files?

I have more than 10,000 C files, which i need to pass each one of them to some application foo.exe for processing and generating dis-assembly files for each one of the C files,i.e. at the end of this process i will have 10,000 lst/output files! Assuming that, this process is not IO-Bound (despite the fact that foo.exe Write new lst file to disk for each c file. is it right assumption ?).

My task is

To implement parallel python program to get the job done in minimum time! by utilizing all cpu cores for this task.

My approach

I have implemented this program and it works for me, the pseudo code listed below:

  1. iterate over all c files and push the abs path for each one in a global List, files_list.
  2. calculate the cpu logical cores number (with psutil py module), this will be the maximum threads to be dispatched later. lets assume it is 8 threads.
  3. generate new list, workers_list (its a list of lists) which contains the intervals or indexes (L_index, R_index) yielded from division of files_list by 8 . e.g. if i have 800 c files then workers_list will look like this: workers_list = [[0-99],[100,199],...,[700,799]].
  4. dispatch 8 thread, workers, which each one will manipulate single entry in workers_list. each thread will open process (subprocess.call(...)) and call the foo.exe on the current c file.

posting the relevant code below:

The relevant Code

import multiprocessing
import subprocess
import psutil
import threading
import os

class LstGenerator(object):
  def __init__(self):
    self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable 
    self.output_dir = r"C:\.....\out"             #abs path to where i want the lst files to be generated
    self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
  
  def slice(self, files):
    files_len = len(files)
    j = psutil.cpu_count()
    slice_step = files_len / j
    workers_list = []
    lhs = 0
    rhs = slice_step
    while j:
      workers_list.append(files[lhs:rhs])
      lhs += slice_step
      rhs += slice_step
      j -= 1
      if j == 1:  # last iteration
        workers_list.append(files[lhs:files_len])
        break
    for each in workers_list:  #for debug only
      print len(each)
    return workers_list
  
  def disassemble(self, objectfiles):
    for each_object in objectfiles:
      cmd = "{elfdump} -T {object} -o {lst}".format(
        elfdump=self.elfdumpExePath,
        object=each_object,
        lst=os.path.join(self.outputs, os.path.basename(each_object).rstrip('o') + 'lst'))
      p = subprocess.call(cmd, shell=True)
  
  def execute(self):
    class FuncThread(threading.Thread):
      def __init__(self, target, *args):
        self._target = target
        self._args = args
        threading.Thread.__init__(self)
    
    workers = []
    for portion in self.slice(self.files):
      workers.append(FuncThread(self.disassemble, portion))

    # dispatch the workers
    for worker in workers:
      worker.start()
    
    # wait or join the previous dispatched workers
    for worker in workers:
      worker.join()
  
  

if __name__ == '__main__':
  lst_gen = LstGenerator()
  lst_gen.execute()

My Questions

  1. can i do this in more efficient way?
  2. do python have standard lib or module that can get the job done and reduce my code/logic complexity? maybe multiprocessing.Pool?

running on windows, with python 2.7!

thanks

Upvotes: 1

Views: 621

Answers (2)

jkr
jkr

Reputation: 19250

Yes, multiprocessing.Pool can help with this. That also does the work of sharding the list of inputs for each CPU. Here is python code (untested) that should get you on your way.

import multiprocessing
import os
import subprocess

def convert(objectfile):
    elfdumpExePath = "C:\.....\elfdump.exe"
    output_dir = "C:\.....\out"

    cmd = "{elfdump} -T {obj} -o {lst}".format(
        elfdump=elfdumpExePath,
        obj=objectfile,
        lst=os.path.join(output_dir, os.path.basename(objectfile).rstrip('o') + 'lst'))
    return cmd

files = ["foo.c", "foo1.c", "foo2.c"]

p = multiprocessing.Pool()
outputs = p.map(convert, files)

Keep in mind that your worker function (convert above) must accept one argument. So if you need to pass in an input path and output path, that must be done as a single argument, and your list of filenames will have to be transformed into a list of pairs, where each pair is input and output.

The answer above is for python 2.7, but keep in mind that python2 has reached its end-of-life. In python3, you can use multiprocessing.Pool in a with statement so that it cleans up on its own.

Upvotes: 4

AdamF
AdamF

Reputation: 2930

Posting an answer for my question after strugling with it for a while, and noticing that i can import concurrent.futures in python2.x! this approach reduce code complexity ro minimum and even improve the execution time. unlike my first thoughts these processes is more IO-bound than cpu-bound! yet, the the time efficiency that i have get was enough convinient for run the program with multi-process.


concurrent.futures

The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

class concurrent.futures.Executor
An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.

submit(fn, *args, **kwargs)

Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.

for further reading please folow the like below: parallel tasks with concurrent.futures


import multiprocessing
import subprocess
import psutil
import threading
import os
import concurrent.futures

class LstGenerator(object):
  def __init__(self):
    self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable 
    self.output_dir = r"C:\.....\out"             #abs path to where i want the lst files to be generated
    self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
  
  def disassemble(self, objectfile):
    cmd = "{elfdump} -T {object} -o {lst}".format(
      elfdump=self.elfdumpExePath,
      object=objectfile,
      lst=os.path.join(self.outputs, os.path.basename(objectfile).rstrip('o') + 'lst'))
    return subprocess.call(cmd, shell=True,stdout=subprocess.PIPE) 
  
  def execute(self):
    with concurrent.futures.ProcessPoolExecutor() as executor:
      results = [executor.submit(self.disassemble(file)) for file in self.files]
  
  

if __name__ == '__main__':
  lst_gen = LstGenerator()
  lst_gen.execute()

Upvotes: 2

Related Questions