Reputation: 8576
A few days back I answered a question on SO regarding reading a tar file in parallel.
This was the gist of the question:
import bz2
import tarfile
from multiprocessing import Pool
tr = tarfile.open('data.tar')
def clean_file(tar_file_entry):
if '.bz2' not in str(tar_file_entry):
return
with tr.extractfile(tar_file_entry) as bz2_file:
with bz2.open(bz2_file, "rt") as bzinput:
# Reading bz2 file
....
....
def process_serial():
members = tr.getmembers()
processed_files = []
for i, member in enumerate(members):
processed_files.append(clean_file(member))
print(f'done {i}/{len(members)}')
def process_parallel():
members = tr.getmembers()
with Pool() as pool:
processed_files = pool.map(clean_file, members)
print(processed_files)
def main():
process_serial() # No error
process_parallel() # Error
if __name__ == '__main__':
main()
We were able to make the error disappear by just opening the tar file inside the child process rather than in the parent, as mentioned in the answer.
I am not able to understand why did this work.
Even if we open the tarfile in the parent process, the child process will get a new copy. So why does opening the tarfile in the child process explicitly make any difference?
Does this mean that in the first case, the child processes were somehow mutating the common tarfile object and causing memory corruption due to concurrent writes?
Upvotes: 3
Views: 150
Reputation: 764
FWIW, the answer in the comments wrt open
is actually incorrect on UNIX-like systems regarding file handle numbers.
If multiprocessing
uses fork()
(which it does under Linux and similar, although I read there was an issue with forking on macOS), the file handles and everything else are happily copied to child processes (by "happily" I mean it's complicated in many edge cases such as forking threads, but still it works fine for file handles).
The following works fine for me:
import multiprocessing
this = open(__file__, 'r')
def read_file():
print(len(this.read()))
def main():
process = multiprocessing.Process(target=read_file)
process.start()
process.join()
if __name__ == '__main__':
main()
The problem is likely that tarfile
has an internal structure and/or buffering while reading, also you can simply run into conflicts by trying to seek and read different parts of the same archive simultaneously. I.e., I'm speculating that using a threadpool without any synchronization is likely to run into exactly the same issues in this case.
Edit: to clarify, extracting a file from a Tar archive is likely (I haven't checked the exact details) done as follows: (1) seek to the offset of the encapsulated part (file), (2) read a chunk of the encapsulated file, write the chunk to the destination file (or pipe, or w/e), (3) repeat (2) until the whole file is extracted.
By attempting to do this in a non-synchronized way from parallel processes using the same file handle, will likely result in mixing of these steps, i.e. starting to process file #2 will seek away from file #1, while we are in the middle of reading file #1, etc.
Edit2 answering the comment below: Memory representation is forked afresh for child processes, that's true; but resources managed on the kernel side (such as file handles, and kernel buffers) are shared.
To illustrate:
import multiprocessing
this = open(__file__, 'rb')
def read_file(worker):
print(worker, this.read(80))
def main():
processes = []
for number in (1, 2):
processes.append(
multiprocessing.Process(target=read_file, args=(number,)))
for process in processes:
process.start()
for process in processes:
process.join()
if __name__ == '__main__':
main()
Running this on Linux I get:
$ python3.8 test.py
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n "
2 b''
If seeking and reading were independent, both processes would print an identical result, but they don't. Since this is a small file, and Python opts to buffer a small amount of data (8 KiB), the first process reads to the EOF, and the second process has no data left to read (unless it of course seeks back).
Upvotes: 1