night-crawler
night-crawler

Reputation: 1469

Python multiprocessing map_async hangs

I have some troubles [probably] with closing a pool of processes in my parser. When all tasks done, it hangs and do nothing, cpu usage is about 1%.

profiles_pool = multiprocessing.Pool(processes=4)
pages_pool = multiprocessing.Pool(processes=4)

m = multiprocessing.Manager()
pages = m.list(['URL'])
pages_done = m.list()

while True:
    # grab all links
    res = pages_pool.imap_unordered(deco_process_people, pages, chunksize=1)

    pages_done += pages
    pages = []

    for new_users,new_pages in res:
        users.update(new_users)
        profile_tasks = [ (new_users[i]['link'],i) for i in new_users ]
        # enqueue grabbed links for parsing
        profiles_pool.map_async(deco_process_profiles, 
                                profile_tasks, chunksize=2, 
                                callback=profile_update_callback)
        # i dont need a result of map_async actually
        # callback will apply parsed data to users dict
        # users dict is an instance of Manager.dict()

        for p in new_pages:
            if p not in pages_done and p not in pages:
                pages.append(p)

    # we need more than 900 pages to be parsed for bug occurrence
    #if len(pages) == 0:
    if len(pages_done) > 900:
        break


# 
# closing other pools
#

# ---- the last printed string: 
print 'Closing profiles pool',
sys.stdout.flush()
profiles_pool.close()
profiles_pool.join()
print 'closed'

I guess the problem is in wrong open tasks calculation in pool queue, but i'm not shure and cannot check this - idk how to get task queue length.

What can it be, and where to look first?

Upvotes: 2

Views: 3510

Answers (2)

night-crawler
night-crawler

Reputation: 1469

I've found out the reason of a bug: "join method of multiprocessing Pool object hangs if iterable argument of pool.map is empty"

http://bugs.python.org/issue12157

Upvotes: 1

torek
torek

Reputation: 488203

The most immediately-obvious problem is that pages_done is a synchronized Manager.list object (so each process can access it atomically), but while pages starts out as one such, it quickly becomes an ordinary un(multi)processed list:

pages_done += pages
pages = []

The second quoted line rebinds pages to a new, empty ordinary list.

Even if you deleted all the elements of pages on the second line (rather than doing a rebinding assignment), you could run into a race where (eg) pages had A, B, and C in it when you did the += on the first line, but had become A, B, C, and D by the second.

A quick fix would be to take items off pages one at a time and put them into pages_done one at a time (not very efficient). Might be better to have these not be shared data structures at all though; it doesn't look like they need to be, in the quoted code (I'm assuming some unquoted code depends on it—since otherwise the rebinding of pages is a red herring anyway!).

Upvotes: 1

Related Questions