Reputation: 15804
I have a dictionary of folder names that I would like to process in parallel. Under each folder, there is an array of file names that I would like to process in series:
folder_file_dict = {
folder_name : {
file_names_key : [file_names_array]
}
}
Ultimately, I will be creating a folder named folder_name
which contains the files with names len(folder_file_dict[folder_name][file_names_key])
. I have a method like so:
def process_files_in_series(file_names_array, udp_port):
for file_name in file_names_array:
time_consuming_method(file_name, udp_port)
# create "file_name"
udp_ports = [123, 456, 789]
Note the time_consuming_method()
above, which takes a long time due to calls over a UDP port. I am also limited to using the UDP ports in the array above. Thus, I have to wait for time_consuming_method
to complete on a UDP port before I can use that UDP port again. This means that I can only have len(udp_ports)
threads running at a time.
Thus, I will ultimately create len(folder_file_dict.keys())
threads, with len(folder_file_dict.keys())
calls to process_files_in_series
. I also have a MAX_THREAD count. I am trying to use the Queue
and Threading
modules, but I am not sure what kind of design I need. How can I do this using Queues and Threads, and possibly Conditions as well? A solution that uses a thread pool may also be helpful.
NOTE
I am not trying to increase the read/write speed. I am trying to parallelize the calls to time_consuming_method
under process_files_in_series
. Creating these files is just part of the process, but not the rate limiting step.
Also, I am looking for a solution that uses Queue
, Threading
, and possible Condition
modules, or anything relevant to those modules. A threadpool solution may also be helpful. I cannot use processes, only threads.
I am also looking for a solution in Python 2.7.
Upvotes: 0
Views: 591
Reputation: 1661
This is kind of a blue print to how you could use multiprocessing.Process with JoinableQueue 's to deliver Jobs to Workers. You will still be bound by I/O but with Process you do have true concurrency, which may prove to be useful, since threading may even be slower than a normal script processing the files.
(Be aware that this will also prevent you from doing anything else with your Laptop if you dare to start too many processes at once :P).
I tried to explain the code as much as possible with comments.
import traceback
from multiprocessing import Process, JoinableQueue, cpu_count
# Number if CPU's on your PC
cpus = cpu_count()
# The Worker Function. Could also be modelled as a class
def Worker(q_jobs):
while True:
# Try / Catch / finally may be necessary for error-prone tasks since the processes
# may hang forever if the task_done() method is not called.
try:
# Get an item from the Queue
item = q_jobs.get()
# At this point the data should somehow be processed
except:
traceback.print_exc()
else:
pass
finally:
# Inform the Queue that the Task has been done
# Without this. The processes can not be killed
# and will be left as Zombies afterwards
q_jobs.task_done()
# A Joinable Queue to end the process
q_jobs = JoinableQueue()
# Create process depending on the number of CPU's
for i in range(cpus):
# target function and arguments
# a list of multiple arguments should not end with ',' e.g.
# (q_jobs, 'bla')
p = Process(target=Worker,
args=(q_jobs,)
)
p.daemon = True
p.start()
# fill Queue with Jobs
q_jobs.put(['Do'])
q_jobs.put(['Something'])
# End Process
q_jobs.join()
Cheers
EDIT
I wrote this with Python 3 in mind. Taking the parenthesis from the print function
print item
should make this work for 2.7.
Upvotes: 0
Reputation: 414315
Using a thread pool:
#!/usr/bin/env python2
from multiprocessing.dummy import Pool, Queue # thread pool
folder_file_dict = {
folder_name: {
file_names_key: file_names_array
}
}
def process_files_in_series(file_names_array, udp_port):
for file_name in file_names_array:
time_consuming_method(file_name, udp_port)
# create "file_name"
...
def mp_process(filenames):
udp_port = free_udp_ports.get() # block until a free udp port is available
args = filenames, udp_port
try:
return args, process_files_in_series(*args), None
except Exception as e:
return args, None, str(e)
finally:
free_udp_ports.put_nowait(udp_port)
free_udp_ports = Queue() # in general, use initializer to pass it to children
for port in udp_ports:
free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #
for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
if error is not None:
print args, error
I don't think you need to bind number of threads to number of udp ports if the processing time may differ for different filenames arrays.
If I understand the structure of folder_file_dict
correctly then to generate the filenames arrays:
def get_files_arrays(folder_file_dict=folder_file_dict):
for folder_name_dict in folder_file_dict.itervalues():
for filenames_array in folder_name_dict.itervalues():
yield filenames_array
Upvotes: 1
Reputation: 77347
Use the multiprocessing.pool.ThreadPool
. It handles queue / thread management for you and can be easily changed to do multiprocessing instead.
EDIT: Added example
Here's an example... multiple threads may end up using the same udp port. I'm not sure if that's a problem for you.
import multithreading
import multithreading.pool
import itertools
def process_files_in_series(file_names_array, udp_port):
for file_name in file_names_array:
time_consuming_method(file_name, udp_port)
# create "file_name"
udp_ports = [123, 456, 789]
folder_file_dict = {
folder_name : {
file_names_key : [file_names_array]
}
}
def main(folder_file_dict, udp_ports):
# number of threads - here I'm limiting to the smaller of udp_ports,
# file lists to process and a cap I arbitrarily set to 4
num_threads = min(len(folder_file_dict), len(udp_ports), 4)
# the pool
pool = multithreading.pool.ThreadPool(num_threads)
# build files to be processed into list. You may want to do other
# Things like join folder_name...
file_arrays = [value['file_names_key'] for value in folder_file_dict.values()]
# do the work
pool.map(process_files_in_series, zip(file_arrays, itertools.cycle(udp_ports))
pool.close()
pool.join()
Upvotes: 0