Queuebee
Queuebee

Reputation: 670

Can a python pool worker return values from initialization?

TL;DR I want to collect the accumulated data in the globals of each worker when the pool is finished processing

Description of what I think I'm missing

As I'm new to multiprocessing, I don't know of all the features that exist. I am looking for a way to make a worker return the value it was initialized with (after manipulating that value a bunch of millions of times). Then, I hope I can collect and merge all these values at the end of the program when all the 'jobs' are done.

import multiprocessing as mp
from collections import defaultdict, Counter
from customtools import load_regexes #, . . .
import gzip
import nltk

result_dict = None
regexes = None

def create_worker():
    global result_dict
    global regexes
    result_dict = defaultdict(Counter) # I want to return this at the end
    # these are a bunch of huge regexes
    regexes = load_regexes()

These functions represents the way I load and process data. The data is a big gzipfile with articles.

def load_data(semaphore):
    with gzip.open('some10Gbfile') as f:
        for line in file:
            semaphore.acquire()
            yield str(line, 'utf-8')

def worker_job(line):
    global regexes
    global result_dict
    hits = defaultdict(Counter)
    for sent in nltk.sent_tokenize(line[3:]):
        for rename, regex in regex.items():
            for hit in regex.finditer(sent):
                hits[rename][hit.group(0)]+=1
    # and more and more... results = _filter(_extract(hits))
    # store some data in results_dict here . . .
    return filtered_hits

Class ResultEater():
    def __init__(self):
        self.wordscounts=defaultdict(Counter)
        self.filtered=Counter()

def eat_results(self, filte red_hits):
    for k, v in filte.items():
        for i, c in v.items():
            self.wordscount[k][i]+=c

This is the main program

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count(), initializer=create_worker)

    semaphore = mp.Semaphore(50)
    loader = load_data(semaphore)
    
    results = ResultEater()
    for intermediate_result in pool.imap_unordered(worker_job, loader, chunksize=10):
        results.eat_results(intermediate_result)
        semaphore.release()

    # results.eat_workers(the_leftover_workers_or_something)
    results.print()

Upvotes: 1

Views: 256

Answers (1)

Aaron
Aaron

Reputation: 11075

I don't really think I understand how exactly returning the data incrementally isn't sufficient, but it kinda seems like you need some sort of finalization function to send the data similar to how you have an initialization function. Unfortunately, I don't think this sort of thing exists for mp.Pool, so it'll require you to use a couple mp.Process's, and send input args, and return results with a couple mp.Queue's

On a side note your use of Semaphore is unncessary, as the call to the "load_data" iterator always happens on the main process. I have moved that to another "producer" process, which puts inputs to a queue, which is also already synchronized automatically by default. This allows you to have one process for gathering inputs, several processes for processing the inputs to outputs, and leaves the main (parent) process to gather outputs. If the "producer" generating the inputs is IO limited by file read speed (very likely), it could also be in a thread rather than a process, but in this case the difference is probably minimal.

I have created an example of a custom "Pool" which allows you to return some data at the end of each worker's "life" using aforementioned "producer-consumer" scheme. there are print statements to track what is going on in each process, but please also read the comments to track what's going on and why:

import multiprocessing as mp
from time import sleep
from queue import Empty

class ExitFlag:
    def __init__(self, exit_value=None):
        self.exit_value = exit_value #optionally pass value along with exit flag

def producer_func(input_q, n_workers):
    for i in range(100): #100 lines of some long file
        print(f"put {i}")
        input_q.put(i) #put each line of the file to the work queue
    print('stopping consumers')
    for i in range(n_workers):
        input_q.put(ExitFlag()) #send shut down signal to each of the workers
    print('producer exiting')    
        
def consumer_func(input_q, output_q, work_func):
    counter = 0
    while True:
        try:
            item = input_q.get(.1) #never wait forever on a "get". It's a recipe for deadlock.
        except Empty:
            continue
        print(f"get {item}")
        if isinstance(item, ExitFlag):
            break
        else:
            counter += 1
            output_q.put(work_func(item))
    output_q.put(ExitFlag(exit_value=counter))
    print('consumer exiting')
    
def work_func(number):
    sleep(.1) #some heavy nltk work...
    return number*2

if __name__ == '__main__':
    input_q = mp.Queue(maxsize=10) #only bother limiting size if you have memory usage constraints
    output_q = mp.Queue(maxsize=10)
    
    n_workers = mp.cpu_count()
    
    producer = mp.Process(target=producer_func, args=(input_q, n_workers)) #generate the input from another process. (this could just as easily be a thread as it seems it will be IO limited anyway)
    producer.start()
    
    consumers = [mp.Process(target=consumer_func, args=(input_q, output_q, work_func)) for _ in range(n_workers)]
    for c in consumers: c.start()
    
    total = 0
    stop_signals = 0
    exit_values = []
    while True:
        try:
            item = output_q.get(.1)
        except Empty:
            continue
        if isinstance(item, ExitFlag):
            stop_signals += 1
            if item.exit_value is not None:
                exit_values.append(item.exit_value) #do something with the return at the end
            if stop_signals >= n_workers: #stop waiting for more results once all consumers finish
                break
        else:
            total += item #do something with the incremental return values
    print(total)
    print(exit_values) 

    #cleanup
    producer.join()
    print("producer joined")
    for c in consumers: c.join()
    print("consumers joined")

Upvotes: 1

Related Questions