JohnD
JohnD

Reputation: 11

Python threading return values

I am new to threading an I have existing application that I would like to make a little quicker using threading.

I have several functions that return to a main Dict and would like to send these to separate threads so that run at the same time rather than one at a time.

I have done a little googling but I cant seem to find something that fits my existing code and could use a little help.

I have around six functions that return to the main Dict like this:

parsed['cryptomaps'] = pipes.ConfigParse.crypto(parsed['split-config'], parsed['asax'], parsed['names'])

The issue here is with the return value. I understand that I would need to use a queue for this but would I need a queue for each of these six functions or one queue for all of these. If it is the later how would I separate the returns from the threads and assign the to the correct Dict entries.

Any help on this would be great.

John

Upvotes: 1

Views: 1510

Answers (1)

Zbyszek Mandziejewicz
Zbyszek Mandziejewicz

Reputation: 51

You can push tuples of (worker, data) to queue to identify the source. Also please note that due to Global Interpreter Lock Python threading is not very useful. I suggest to take a look at multiprocessing module which offers interface very similiar to multithreading but will actually scale with number of workers.

Edit:

Code sample.

import multiprocessing as mp

# py 3 compatibility
try:
    from future_builtins import range, map
except ImportError:
    pass


data = [
    # input data
    # {split_config: ... }
]

def crypto(split_config, asax, names):
    # your code here
    pass

if __name__ == "__main__":
    terminate = mp.Event()
    input = mp.Queue()
    output = mp.Queue()


    def worker(id, terminate, input, output):
        # use event here to graciously exit
        # using Process.terminate would leave queues
        # in undefined state
        while not terminate.is_set():
            try:
                x = input.get(True, timeout=1000)
                output.put((id, crypto(**x)))
            except Queue.Empty:
                pass

    workers = [mp.Process(target=worker, args=(i, )) for i in range(0, mp.cpu_count())]
    for worker in workers:
        worker.start()

    for x in data:
        input.put(x)

    # terminate workers
    terminate.set()

    # process results
    # make sure that queues are emptied otherwise Process.join can deadlock

    for worker in workers:
        worker.join()

Upvotes: 1

Related Questions