Reputation: 11
I am new to threading an I have existing application that I would like to make a little quicker using threading.
I have several functions that return to a main Dict and would like to send these to separate threads so that run at the same time rather than one at a time.
I have done a little googling but I cant seem to find something that fits my existing code and could use a little help.
I have around six functions that return to the main Dict like this:
parsed['cryptomaps'] = pipes.ConfigParse.crypto(parsed['split-config'], parsed['asax'], parsed['names'])
The issue here is with the return value. I understand that I would need to use a queue for this but would I need a queue for each of these six functions or one queue for all of these. If it is the later how would I separate the returns from the threads and assign the to the correct Dict entries.
Any help on this would be great.
John
Upvotes: 1
Views: 1510
Reputation: 51
You can push tuples of (worker, data) to queue to identify the source. Also please note that due to Global Interpreter Lock Python threading is not very useful. I suggest to take a look at multiprocessing module which offers interface very similiar to multithreading but will actually scale with number of workers.
Edit:
Code sample.
import multiprocessing as mp
# py 3 compatibility
try:
from future_builtins import range, map
except ImportError:
pass
data = [
# input data
# {split_config: ... }
]
def crypto(split_config, asax, names):
# your code here
pass
if __name__ == "__main__":
terminate = mp.Event()
input = mp.Queue()
output = mp.Queue()
def worker(id, terminate, input, output):
# use event here to graciously exit
# using Process.terminate would leave queues
# in undefined state
while not terminate.is_set():
try:
x = input.get(True, timeout=1000)
output.put((id, crypto(**x)))
except Queue.Empty:
pass
workers = [mp.Process(target=worker, args=(i, )) for i in range(0, mp.cpu_count())]
for worker in workers:
worker.start()
for x in data:
input.put(x)
# terminate workers
terminate.set()
# process results
# make sure that queues are emptied otherwise Process.join can deadlock
for worker in workers:
worker.join()
Upvotes: 1