Joshelin
Joshelin

Reputation: 41

Multiprocessing Pool much slower than manually instantiating multiple Processes

I'm reading a chunk from a big file, loading it in memory as a list of lines, then processing a task on every line.

The sequential solution was taking too long so I started looking at how to parallelize it.

The first solution I came up with is with Process and managing each subprocess' slice of the list.

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    for piece in read_in_chunks(file, CHUNKSIZE):
        jobs = []
        piece_list = piece.splitlines()
        piece_list_len = len(piece_list)
        item_delta = round(piece_list_len/N_PROCESSES)
        start = 0
        for process in range(N_PROCESSES):
            finish = start + item_delta
            p = mp.Process(target=work, args=(piece_list[start:finish]))
            start = finish
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()

It completes each chunk in roughly 2498ms.

Then I discovered the Pool tool to automatically manage the slices.

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            pool.map(work, piece_list)

It completes each chunk in roughly 15540ms, 6 times slower than manual but still faster than sequential.

Am I using the Pool wrong? Is there a better or faster way to do this?

Thank you for reading.

Update

The Pool has quite the overhead as Hannu suggested.

The work function called by the Process method is expecting a list of lines.

The work function called by the Pool method is expecting a single line because of how the Pool is deciding the slices.

I'm not quite sure how to make the pool give a certain worker more than one line at a time.

That should solve the problem?

Update 2

Final question, is there a 3rd better way to do it?

Upvotes: 0

Views: 1125

Answers (3)

Joshelin
Joshelin

Reputation: 41

Oh boy! This was quite a ride to figure out, but very fun nonetheless.

The Pool.map is getting, pickling and passing every item individually from the iterator to each one of the workers. Once a worker is done, rinse and repeat, get -> pickle -> pass. This creates a noticeable overhead cost.

This is actually intended because the Pool.map isn't smart enough to know the length of the iterator, nor is able to effectively make a list of lists and passing each list inside it (chunk) to a worker.

But, it can be helped. Simply transforming the list to a list of chunks (lists) with a list comprehension works like a charm and reduces the overhead to the same level as the Process method.

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            piece_list_len = len(piece_list)
            item_delta = round(piece_list_len / N_PROCESSES)
            pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])

This Pool with a list of lists iterator has the exact same running time of the Process method.

Upvotes: 1

Hannu
Hannu

Reputation: 12205

I am not entirely sure about this but it appears to me that your programs are materially different in what they submit to workers.

In your Process method you seem to be submitting a large chunk of rows:

p = mp.Process(target=work, args=(piece_list[start:finish]))

but then when you use Pool, you do this:

for piece in read_in_chunks(file, CHUNKSIZE):
    piece_list = piece.splitlines()
    pool.map(work, piece_list)

You read your file in chunks but then when you use splitlines, your piece_list iterable submits units of one.

Which means in your process approach you submit as many subtasks as you have CPUs but in your Pool approach you submit as many tasks as your source data has lines. If you have a lot of lines, this will create massive orchestration overhead in your Pool as each worker only processes one line at a time, then finishes, returns result and Pool then submits another line to the newly freed worker.

If this is what is going on here, it definitely explains why Pool takes much longer to complete.

What happens if you use your reader as the iterable and skip the line splitting part:

pool.map(work, read_in_chunks(file, CHUNKSIZE))

Upvotes: 1

Amirul Akmal
Amirul Akmal

Reputation: 411

I do not know if this gonna work , but may you try with this?

if __name__ == "__main__":
    with open(BIG_FILE_PATH, encoding="Latin-1") as file:
        with mp.Pool(N_PROCESSES) as pool:
            for piece in read_in_chunks(file, CHUNKSIZE):
                piece_list = piece.splitlines()
            pool.map(work, piece_list)

My reasoning:
1. pool.map() , just need once and your code is looping it
2. My guess that the loop makes it slower
3. Because parallel processing should be faster hehe

Upvotes: 0

Related Questions