Reputation: 319
I am trying to load numpy files asynchronously in a Pool:
self.pool = Pool(2, maxtasksperchild = 1)
...
nextPackage = self.pool.apply_async(loadPackages, (...))
for fi in np.arange(len(files)):
packages = nextPackage.get(timeout=30)
# preload the next package asynchronously. It will be available
# by the time it is required.
nextPackage = self.pool.apply_async(loadPackages, (...))
The method "loadPackages":
def loadPackages(... (2 strings & 2 ints) ...):
print("This isn't printed!')
packages = {
"TRUE": np.load(gzip.GzipFile(path1, "r")),
"FALSE": np.load(gzip.GzipFile(path2, "r"))
}
return packages
Before even the first "package" is loaded, the following error occurs:
Exception in thread Thread-8: Traceback (most recent call last):
File "C:\Users\roman\Anaconda3\envs\tsc1\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Users\roman\Anaconda3\envs\tsc1\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\pool.py", line 463, in _handle_results task = get() File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\connection.py", line 250, in recv buf = self._recv_bytes() File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\connection.py", line 318, in _recv_bytes return self._get_more_data(ov, maxsize) File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\connection.py", line 337, in _get_more_data assert left > 0 AssertionError
I monitor the resources closely: Memory is not an issue, I still have plenty left when the error occurs. The unzipped files are just plain multidimensional numpy arrays. Individually, using a Pool with a simpler method works, and loading the file like that works. Only in combination it fails. (All this happens in a custom keras generator. I doubt this helps but who knows.) Python 3.5.
What could the cause of this issue be? How can this error be interpreted?
Thank you for your help!
Upvotes: 6
Views: 4872
Reputation: 611
There is a bug in Python C core code that prevents data responses bigger than 2GB return correctly to the main thread. you need to either split the data into smaller chunks as suggested in the previous answer or not use multiprocessing for this function
I reported this bug to python bugs list (https://bugs.python.org/issue34563) and created a PR (https://github.com/python/cpython/pull/9027) to fix it, but it probably will take a while to get it released (UPDATE: the fix is present in python 3.8.0+)
if you are interested you can find more details on what causes the bug in the bug description in the link I posted
Upvotes: 10
Reputation:
It think I've found a workaround by retrieving data in small chunks. In my case it was a list of lists.
I had:
for i in range(0, NUMBER_OF_THREADS):
print('MAIN: Getting data from process ' + str(i) + ' proxy...')
X_train.extend(ListasX[i]._getvalue())
Y_train.extend(ListasY[i]._getvalue())
ListasX[i] = None
ListasY[i] = None
gc.collect()
Changed to:
CHUNK_SIZE = 1024
for i in range(0, NUMBER_OF_THREADS):
print('MAIN: Getting data from process ' + str(i) + ' proxy...')
for k in range(0, len(ListasX[i]), CHUNK_SIZE):
X_train.extend(ListasX[i][k:k+CHUNK_SIZE])
Y_train.extend(ListasY[i][k:k+CHUNK_SIZE])
ListasX[i] = None
ListasY[i] = None
gc.collect()
And now it seems to work, possibly by serializing less data at a time. So maybe if you can segment your data into smaller portions you can overcome the issue. Good luck!
Upvotes: 2