Animesh Pandey
Animesh Pandey

Reputation: 6018

Processing a long list using Multiprocessing

output = mp.Queue()

def endScoreList(all_docs, query, pc, output):
    score_list = []
    for doc in all_docs:
        print "In process", pc
        score_list.append(some_score(doc, query))
        print "size of score_list is", len(score_list)
    output.put((doc, score_list))

if __name__ == '__main__':
    mp.freeze_support()
    num_of_workers = mp.cpu_count()
    doc_list = getDocuments(query)
    ## query is a list of strings.
    ## doc_list is a list of document names
    processes = [mp.Process(target = endScoreList, args = (doc_list, x, query, output)) for x in range(num_of_workers)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    results = [output.get() for p in processes]
    print results

I have a list of document names all_docs whose data I have to compare with an input query. This is done using score that I get from some_score(doc, query). The list of documents is ~100k. I have to get scores of all the documents. How can I make a program so that the scores are generated parallelly. The scores are independent of each other so in the end I just have to merge all the returned list of (doc, score). I tried to make a program, but I don't think it is running parallelly.

Please help me out.

I am using Windows 64-Bit/i7.

Upvotes: 1

Views: 1772

Answers (1)

Blckknght
Blckknght

Reputation: 104802

It's a bit hard to suggest what is going wrong with your current code, as the example you've shown has a number of issues (for instance, you're using // to introduce a comment, creating processes that call a finalScore function and pass doc_list as a parameter, neither of which are defined).

Rather than try to figure out what is going on with your code, I'd like to suggest an alternative solution that is likely to be much simpler. If you use multiprocessing.Pool's map method, you'll get your work distributed over however many processes are in the pool.

import multiprocessing as mp

def worker(doc):
    return doc, some_score(doc, "query")

if __name__ == "__main__":
    mp.freeze_support()
    p = mp.Pool() # default is a number of processes equal to the number of CPU cores
    scores = p.map(worker, all_docs)
    p.close()
    p.join()

This simple version assumes that the query string is a constant. If that's not the case, you could pass it as an argument in the map call (or consider using starmap instead).

Upvotes: 2

Related Questions