Reputation: 1813
My goal is to go iterate through a directory and compute the MD5 of all the files within. I used code for a solution to a similar problem
Parallel file matching, Python
import os
import re
import sys
import time
import md5
from stat import S_ISREG
import multiprocessing
global queue
size_limit = 500000
target = sys.argv[1]
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
def walk_files(topdir):
"""yield up full pathname for each file in tree under topdir"""
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
pathname = os.path.join(dirpath, fname)
yield pathname
def files_to_search(topdir):
"""yield up full pathname for only files we want to search"""
for fname in walk_files(topdir):
try:
# if it is a regular file and big enough, we want to search it
sr = os.stat(fname)
if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
yield fname
except OSError:
pass
def worker_search_fn(fname):
fp = open(fname, 'rt')
# read one line at a time from file
contents = fp.read()
hash = md5.md5(contents)
global queue
print "enqueue"
queue.put(fname+'-'+hash.hexdigest())
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
#kick of processes to md5 the files and wait till completeion
queue = multiprocessing.Queue()
pool = multiprocessing.Pool()
pool.map(worker_search_fn, files_to_search(target))
pool.close()
pool.join()
#Should be done, now lets send do our analysis
while not queue.empty():
print queue.get()
I added the "print enqueue" statement as a debugging purpose and I notice that the code does indeed lock up when recursing a large directory tree. I am not sure if two processes are trying to access the queue at the same time, thus causing a deadlock.
Perhaps there is a better way to do this? The structure does not have to be a queue but has to be lock free as to take full advantage of multiprocessing. I want to a recurse and md5 a directory in parallel and once that is complete do something with the list as a whole. For debugging I'm just printing the completed queue. Any suggestions?
Upvotes: 0
Views: 3203
Reputation: 414745
It is unclear whether your program is I/O or CPU bound i.e., a single process could perform better than several processes if the task is I/O bound e.g., by minimizing number of disk seeks. You could check this by specifying different nprocesses
values (below) and see what provides better results in your case.
You don't need queue in this case:
#!/usr/bin/env python
import os
import sys
from hashlib import md5
from multiprocessing import Pool, freeze_support
from stat import S_ISREG
def walk_files(topdir):
"""yield up full pathname for each file in tree under topdir"""
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
pathname = os.path.join(dirpath, fname)
yield pathname
def files_to_process(topdir, size_limit):
"""yield up full pathname for only files we want to process"""
for fname in walk_files(topdir):
try: sr = os.stat(fname)
except OSError: pass
else:
# if it is a regular file and small enough, we want to process it
if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
yield fname
def md5sum(fname):
with open(fname, 'rb') as fp:
# read all file at once
contents = fp.read()
hash = md5(contents)
return fname, hash.hexdigest()
def main(argv=None):
if argv is None:
argv = sys.argv
topdir = argv[1]
size_limit = 500000
nprocesses = 1
pool = Pool(processes=nprocesses)
files = files_to_process(topdir, size_limit)
for fname, hexdigest in pool.imap_unordered(md5sum, files):
print("%s\t%s" % (fname, hexdigest))
if __name__=="__main__":
freeze_support()
main()
$ python md5sum.py .
./md5sum.py 9db44d3117673790f1061d4b8f00e8ce
Upvotes: 3
Reputation: 1119
Because large directory need large time to execute walk_files()
It is not deadlock
And...
remove pool.join()
multiprocessing.Pool().map()
blocks till the result is ready, so you don't need pool.join()
Upvotes: 1