user974896
user974896

Reputation: 1813

Parallel queueing - Multiprocessing pool, python

My goal is to go iterate through a directory and compute the MD5 of all the files within. I used code for a solution to a similar problem

Parallel file matching, Python

import os
import re
import sys
import time
import md5

from stat import S_ISREG

import multiprocessing

global queue
size_limit = 500000

target = sys.argv[1]



############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################

def walk_files(topdir):
     """yield up full pathname for each file in tree under topdir"""
     for dirpath, dirnames, filenames in os.walk(topdir):
         for fname in filenames:
             pathname = os.path.join(dirpath, fname)
             yield pathname

 def files_to_search(topdir):
     """yield up full pathname for only files we want to search"""
     for fname in walk_files(topdir):
         try:
             # if it is a regular file and big enough, we want to search it
             sr = os.stat(fname)
             if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
                 yield fname
         except OSError:
             pass

def worker_search_fn(fname):
     fp = open(fname, 'rt')
     # read one line at a time from file
     contents = fp.read()
     hash = md5.md5(contents)
     global queue
     print "enqueue"
     queue.put(fname+'-'+hash.hexdigest())

################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################

#kick of processes to md5 the files and wait till completeion

queue = multiprocessing.Queue()
pool = multiprocessing.Pool()
pool.map(worker_search_fn, files_to_search(target))
pool.close()
pool.join()

#Should be done, now lets send do our analysis
while not queue.empty():
    print queue.get()

I added the "print enqueue" statement as a debugging purpose and I notice that the code does indeed lock up when recursing a large directory tree. I am not sure if two processes are trying to access the queue at the same time, thus causing a deadlock.

Perhaps there is a better way to do this? The structure does not have to be a queue but has to be lock free as to take full advantage of multiprocessing. I want to a recurse and md5 a directory in parallel and once that is complete do something with the list as a whole. For debugging I'm just printing the completed queue. Any suggestions?

Upvotes: 0

Views: 3203

Answers (2)

jfs
jfs

Reputation: 414745

It is unclear whether your program is I/O or CPU bound i.e., a single process could perform better than several processes if the task is I/O bound e.g., by minimizing number of disk seeks. You could check this by specifying different nprocesses values (below) and see what provides better results in your case.

You don't need queue in this case:

#!/usr/bin/env python
import os
import sys

from hashlib         import md5
from multiprocessing import Pool, freeze_support
from stat            import S_ISREG

def walk_files(topdir):
     """yield up full pathname for each file in tree under topdir"""
     for dirpath, dirnames, filenames in os.walk(topdir):
         for fname in filenames:
             pathname = os.path.join(dirpath, fname)
             yield pathname

def files_to_process(topdir, size_limit):
    """yield up full pathname for only files we want to process"""
    for fname in walk_files(topdir):
        try: sr = os.stat(fname)
        except OSError: pass
        else:
            # if it is a regular file and small enough, we want to process it
            if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
                yield fname

def md5sum(fname):
    with open(fname, 'rb') as fp:
        # read all file at once
        contents = fp.read()
        hash = md5(contents)
        return fname, hash.hexdigest()

def main(argv=None):
    if argv is None:
        argv = sys.argv
    topdir = argv[1]
    size_limit = 500000
    nprocesses = 1

    pool = Pool(processes=nprocesses)
    files = files_to_process(topdir, size_limit)
    for fname, hexdigest in pool.imap_unordered(md5sum, files):
        print("%s\t%s" % (fname, hexdigest))

if __name__=="__main__":
    freeze_support()
    main()

Example

$ python md5sum.py .
./md5sum.py 9db44d3117673790f1061d4b8f00e8ce

Upvotes: 3

Frank Fang
Frank Fang

Reputation: 1119

Because large directory need large time to execute walk_files() It is not deadlock

And...

remove pool.join()

multiprocessing.Pool().map() blocks till the result is ready, so you don't need pool.join()

Upvotes: 1

Related Questions