Tytire Recubans
Tytire Recubans

Reputation: 997

multiprocessing exits before doing any job

I have inherited a certain parser that is supposed to parse 10 files with ~4m lines each.

The code was written in Python 2, which I updated.

There is a multiprocessing logic which i just don't seem to be able to get to work.

from multiprocessing.pool import ThreadPool
import glob

DATADIR = 'home/my_dir/where/all/my/files/are'

def process_file(filepath):
    # read line by line, parse and insert to postgres database. 

def process_directory(dirpath):
    pattern = f'{dirpath}/*dat'  # files have .dat extension. 
    tp = ThreadPool(10)
    for filepath in glob.glob(pattern):
        print(filepath)
        tp.apply_async(process_file, filepath)
    tp.close()
    tp.join()

if __name__ == '__main__':
    process_directory(DATADIR)

I have gone through a lot of the documentation and some similar questions but it just doesn't seem to work.

With the parser code what happens is that I do get printed on the console all the paths of the file that I need parsed, but then that's it the program doesn't do anything else.

Upvotes: 0

Views: 75

Answers (1)

dano
dano

Reputation: 94961

The problem is in the way you're calling apply_async. I made a simple reproducer of your problem, but with a slight tweak to get the result from each call:

from multiprocessing.pool import ThreadPool

def func(f):
    print("hey " + f)
    return f + "1"

l = ["name", "name2", "name3"]
pool = ThreadPool(3)
out = []
for a in l:
    print(a)
    out.append(pool.apply_async(func, a))

# Check the response from each `apply_async` call
for a in out:
    a.get()

pool.close()
pool.join()

This returns an error:

Traceback (most recent call last):
  File "a.py", line 16, in <module>
    a.get()
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
TypeError: func() takes 1 positional argument but 4 were given

It thinks you're passing four positional arguments, instead of one. This is because apply_async wants all the arguments passed in a tuple, like this:

pool.apply_async(func, (a,))

If you put filepath in a tuple when you call apply_async, I think you'll get the behavior you expect.

It's also worth noting that your usecase is well-suited to using pool.map instead of apply_async, which is a little more succinct:

pool.map(process_file, glob.glob(pattern))

Upvotes: 2

Related Questions