Reputation: 41
I'm reading a chunk from a big file, loading it in memory as a list of lines, then processing a task on every line.
The sequential solution was taking too long so I started looking at how to parallelize it.
The first solution I came up with is with Process and managing each subprocess' slice of the list.
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
for piece in read_in_chunks(file, CHUNKSIZE):
jobs = []
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len/N_PROCESSES)
start = 0
for process in range(N_PROCESSES):
finish = start + item_delta
p = mp.Process(target=work, args=(piece_list[start:finish]))
start = finish
jobs.append(p)
p.start()
for job in jobs:
job.join()
It completes each chunk in roughly 2498ms.
Then I discovered the Pool tool to automatically manage the slices.
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
It completes each chunk in roughly 15540ms, 6 times slower than manual but still faster than sequential.
Am I using the Pool wrong? Is there a better or faster way to do this?
Thank you for reading.
Update
The Pool has quite the overhead as Hannu suggested.
The work function called by the Process method is expecting a list of lines.
The work function called by the Pool method is expecting a single line because of how the Pool is deciding the slices.
I'm not quite sure how to make the pool give a certain worker more than one line at a time.
That should solve the problem?
Update 2
Final question, is there a 3rd better way to do it?
Upvotes: 0
Views: 1125
Reputation: 41
Oh boy! This was quite a ride to figure out, but very fun nonetheless.
The Pool.map is getting, pickling and passing every item individually from the iterator to each one of the workers. Once a worker is done, rinse and repeat, get -> pickle -> pass. This creates a noticeable overhead cost.
This is actually intended because the Pool.map isn't smart enough to know the length of the iterator, nor is able to effectively make a list of lists and passing each list inside it (chunk) to a worker.
But, it can be helped. Simply transforming the list to a list of chunks (lists) with a list comprehension works like a charm and reduces the overhead to the same level as the Process method.
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len / N_PROCESSES)
pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])
This Pool with a list of lists iterator has the exact same running time of the Process method.
Upvotes: 1
Reputation: 12205
I am not entirely sure about this but it appears to me that your programs are materially different in what they submit to workers.
In your Process method you seem to be submitting a large chunk of rows:
p = mp.Process(target=work, args=(piece_list[start:finish]))
but then when you use Pool, you do this:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
You read your file in chunks but then when you use splitlines
, your piece_list
iterable submits units of one.
Which means in your process approach you submit as many subtasks as you have CPUs but in your Pool approach you submit as many tasks as your source data has lines. If you have a lot of lines, this will create massive orchestration overhead in your Pool as each worker only processes one line at a time, then finishes, returns result and Pool then submits another line to the newly freed worker.
If this is what is going on here, it definitely explains why Pool takes much longer to complete.
What happens if you use your reader as the iterable and skip the line splitting part:
pool.map(work, read_in_chunks(file, CHUNKSIZE))
Upvotes: 1
Reputation: 411
I do not know if this gonna work , but may you try with this?
if __name__ == "__main__":
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
My reasoning:
1. pool.map() , just need once and your code is looping it
2. My guess that the loop makes it slower
3. Because parallel processing should be faster hehe
Upvotes: 0