Reputation: 168
I have a python function that reads random snippets from a large file and does some processing on it. I want the processing to happen in multiple processes and so make use of multiprocessing. I open the file (in binary mode) in the parent process and pass the file descriptor to each child process then use a multiprocessing.Lock() to synchronize access to the file. With a single worker things work as expected, but with more workers, even with the lock, the file reads will randomly return bad data (usually a bit from one part of the file and a bit from the another part of the file). In addition, the position within the file (as returned by file.tell()) will often get messed up. This all suggests a basic race condition accessing the descriptor, but my understanding is the multiprocessing.Lock() should prevent concurrent access to it. Does file.seek() and/or file.read() have some kind of asynchronous operations that don't get contained within the lock/unlock barriers? What is going here?
An easy workaround is to have each process open the file individually and get its own file descriptor (I've confirmed this does work), but I'd like to understand what I'm missing. Opening the file in text mode also prevents the issue from occurring, but doesn't work for my use case and doesn't explain what is happening in the binary case.
I've run the following reproducer on a number of Linux systems and OS X and on various local and remote file systems. I always get quite a few bad file positions and at least a couple of checksum errors. I know the read isn't guaranteed to read the full amount of data requested, but I've confirmed that is not what is happening here and omitted that code in an effort to keep things concise.
import argparse
import multiprocessing
import random
import string
def worker(worker, args):
rng = random.Random(1234 + worker)
for i in range(args.count):
block = rng.randrange(args.blockcount)
start = block * args.blocksize
with args.lock:
args.fd.seek(start)
data = args.fd.read(args.blocksize)
pos = args.fd.tell()
if pos != start + args.blocksize:
print(i, "bad file position", start, start + args.blocksize, pos)
cksm = sum(data)
if cksm != args.cksms[block]:
print(i, "bad checksum", cksm, args.cksms[block])
args = argparse.Namespace()
args.file = '/tmp/text'
args.count = 1000
args.blocksize = 1000
args.blockcount = args.count
args.filesize = args.blocksize * args.blockcount
args.num_workers = 4
args.cksms = multiprocessing.Array('i', [0] * args.blockcount)
with open(args.file, 'w') as f:
for i in range(args.blockcount):
data = ''.join(random.choice(string.ascii_lowercase) for x in range(args.blocksize))
args.cksms[i] = sum(data.encode())
f.write(data)
args.fd = open(args.file, 'rb')
args.lock = multiprocessing.Lock()
procs = []
for i in range(args.num_workers):
p = multiprocessing.Process(target=worker, args=(i, args))
procs.append(p)
p.start()
Example output:
$ python test.py
158 bad file position 969000 970000 741000
223 bad file position 908000 909000 13000
232 bad file position 679000 680000 960000
263 bad file position 959000 960000 205000
390 bad file position 771000 772000 36000
410 bad file position 148000 149000 42000
441 bad file position 677000 678000 21000
459 bad file position 143000 144000 636000
505 bad file position 579000 580000 731000
505 bad checksum 109372 109889
532 bad file position 962000 963000 243000
494 bad file position 418000 419000 2000
569 bad file position 266000 267000 991000
752 bad file position 732000 733000 264000
840 bad file position 801000 802000 933000
799 bad file position 332000 333000 989000
866 bad file position 150000 151000 248000
866 bad checksum 109116 109375
887 bad file position 39000 40000 974000
937 bad file position 18000 19000 938000
969 bad file position 20000 21000 24000
953 bad file position 542000 543000 767000
977 bad file position 694000 695000 782000
Upvotes: 2
Views: 1218
Reputation: 401
I've checked, only using multiprocessing.Lock (without buffering = 0), still met the bad data
. with both multiprocessing.Lock
and buffering=0
, all things goes well
Upvotes: 0
Reputation: 32073
This seems to be caused by buffering: using open(args.file, 'rb', buffering=0)
I can't reproduce anymore.
https://docs.python.org/3/library/functions.html#open
buffering is an optional integer used to set the buffering policy. Pass 0 to switch buffering off [...] When no buffering argument is given, the default buffering policy works as follows: [...] Binary files are buffered in fixed-size chunks; the size of the buffer [...] will typically be 4096 or 8192 bytes long. [...]
Upvotes: 1