Reputation: 1141
I would like to compare two folders with same path structure and same files in all the subfolders. The folder is quite big with size of about 80GB and file numbers of 8000.
I would like to make sure each corresponding file pairs under two top directories have the same md5 checksum value. I wrote a simple tree DFS function searching for all the files under two directories, sort them according to file sizes, store it them in two lists.
When I iterated through the lists I found it very time consuming to do all the comparison and the CPU usage rate was low as well.
I think multiprocessing module is something good for this case. This is my implementation for multiprocessing:
from multiprocessing import Pool, cpu_count
import hashlib
def calc_md5(item):
m = hashlib.md5()
with open(item, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
m.update(chunk)
return m.hexdigest()
def worker(args):
a, b = args
return calc_md5(a) == calc_md5(b)
def multi_compare(queue_a, queue_b, thread):
pool = Pool(processes = cpu_count() - 1)
# Task iterable
task = zip(queue_a, queue_b)
# Multiprocessing
for retval in pool.imap_unordered(worker, task, chunksize = 5):
if not retval:
print "Bad Detected"
Here queue_a and queue_b are the to be compared file's paths sorted according to file size. I am expecting higher CPU usage and better performance out of this multiprocessing methods but it seems not to be the case. While the simple sequential iteration takes about 3200 seconds to finish, the multiprocessing method takes about 4600 seconds.
I am curious about why this is the case? Is this a good point to use multiprocessing? What is the bottleneck of this bad performance in my code? Is there a way to improve it?
Edit: I have set the chunksize according to my gut feeling. I guess I can change it to length of queue_a or queue_b devided by thread number and sort the task queue to be like first 1/4 of it containing queue_a[0::thread] or queue_b[0::thread] elements and vice versa. This will feed the similar size of tasks to all the thread and keep all the thread busy all the time. I don't know if this is a good way to gain extra performance and I am still testing on this.
Edit: The test in above edit takes 4000 seconds to finish. Slightly better than chunksize = 5. Still worse than serial method. So I would like to ask how can I determine the bottleneck of this multiprocessing program.
Thank you!
Upvotes: 1
Views: 3818
Reputation: 2035
It's the IO that limits the performance. The MD5 algorithm is now a too easy task for CPUs. The following code calculates MD5 performance in GB/s.
import time
import hashlib
from multiprocessing import Pool
def worker(x):
data = bytearray(xrange(256)) * 4 * 1024
md5 = hashlib.md5()
for x in xrange(1024):
md5.update(data)
if __name__ == '__main__':
num_workers = 4
pool = Pool(num_workers)
start = time.time()
pool.map(worker, xrange(num_workers))
print num_workers / (time.time() - start), 'Gb/s'
A relatively weak intel's modern mobile i3 CPU (2 cores, 4 threads)
is able to hash at rate of 1 Gb per second. Compare this with
SATA3 bandwidth which is 600 Mb/s.
So even when using SSD the disk interface will limit the hashing speed.
On HDDs the situation even worse.
Multiple readers will force the disk to shift its reading heads causing more delays than if only one reader thread would used.
It's like reading a heavily fragmented file.
When the dataset is not that large OS's file cache can help greatly. It's not your case, though.
Upvotes: 1