Reputation: 2807
I have some simple python multiprocessing code as below:
files = ['a.txt', 'b.txt', 'c.txt', etc..]
def convert_file(file):
do_something(file)
mypool = Pool(number_of_workers)
mypool.map(convert_file, files)
I have 100,000s files to be converted by convert_file
and would like to run function where I upload every 20 converted files to a server without waiting for all files to be converted. How would I go about doing that?
Upvotes: 2
Views: 2943
Reputation: 40883
With multiprocessing you have a slight problem with how to deal with exceptions that occur within a single job. If you use the map
variants then you need to be careful in how you poll for results otherwise you might lose some if the map
function is forced to raise an exception. Further, you won't even know which job was the problem unless you have special handling any any exceptions within your job. If you use the apply
variants then you don't need to be has careful when getting your results, but collating the results becomes a bit more tricky.
Overall, I think map
is the easiest to get working though.
First, you need a special exception, which cannot be created in your main module, otherwise Python will have trouble serialising and deserialising it correctly.
eg.
custom_exceptions.py
class FailedJob(Exception):
pass
main.py
from multiprocessing import Pool
import time
import random
from custom_exceptions import FailedJob
def convert_file(filename):
# pseudo implementation to demonstrate what might happen
if filename == 'file2.txt':
time.sleep(0.5)
raise Exception
elif filename =='file0.txt':
time.sleep(0.3)
else:
time.sleep(random.random())
return filename # return filename, so we can identify the job that was completed
def job(filename):
"""Wraps any exception that occurs with FailedJob so we can identify which job failed
and why"""
try:
return convert_file(filename)
except Exception as ex:
raise FailedJob(filename) from ex
def main():
chunksize = 4 # number of jobs before dispatch
total_jobs = 20
files = list('file{}.txt'.format(i) for i in range(total_jobs))
with Pool() as pool:
# we use imap_unordered as we don't care about order, we want the result of the
# jobs as soon as they are done
iter_ = pool.imap_unordered(job, files)
while True:
completed = []
while len(completed) < chunksize:
# collect results from iterator until we reach the dispatch threshold
# or until all jobs have been completed
try:
result = next(iter_)
except StopIteration:
print('all child jobs completed')
# only break out of inner loop, might still be some completed
# jobs to dispatch
break
except FailedJob as ex:
print('processing of {} job failed'.format(ex.args[0]))
else:
completed.append(result)
if completed:
print('completed:', completed)
# put your dispatch logic here
if len(completed) < chunksize:
print('all jobs completed and all job completion notifications'
' dispatched to central server')
return
if __name__ == '__main__':
main()
Upvotes: 4
Reputation: 1567
You could use a shared variable across your processes that keeps track of the converted files. You can find an example here
The variable is automatically locked when a process wants to read and write. During the lock all other processes that want to access the variable have to wait. So you can poll the variable in the main loop and check if it is bigger than 20, while the converting-processes keep incrementing the variable. As soon as the value surpasses 20 you reset the value and write the files to your server.
Upvotes: 0