Reputation: 23038
I have a data.h5
file organised in multiple chunks, the entire file being several hundred gigabytes. I need to work with a filtered subset of the file in memory, in the form of a Pandas DataFrame.
The goal of the following routine is to distribute the filtering work across several processes, then concatenate the filtered results into the final DataFrame.
Since reading from the file takes a significant amount of time, I'm trying to make each process read its own chunk in a concurrent manner as well.
import multiprocessing as mp, pandas as pd
store = pd.HDFStore('data.h5')
min_dset, max_dset = 0, len(store.keys()) - 1
dset_list = list(range(min_dset, max_dset))
frames = []
def read_and_return_subset(dset):
# each process is intended to read its own chunk in a concurrent manner
chunk = store.select('batch_{:03}'.format(dset))
# and then process the chunk, do the filtering, and return the result
output = chunk[chunk.some_condition == True]
return output
with mp.Pool(processes=32) as pool:
for frame in pool.map(read_and_return_subset, dset_list):
frames.append(frame)
df = pd.concat(frames)
However, the above code triggers this error:
HDF5ExtError Traceback (most recent call last)
<ipython-input-174-867671c5a58f> in <module>()
53
54 with mp.Pool(processes=32) as pool:
---> 55 for frame in pool.map(read_and_return_subset, dset_list):
56 frames.append(frame)
57
/usr/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/usr/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
HDF5ExtError: HDF5 error back trace
File "H5Dio.c", line 173, in H5Dread
can't read data
File "H5Dio.c", line 554, in H5D__read
can't read data
File "H5Dchunk.c", line 1856, in H5D__chunk_read
error looking up chunk address
File "H5Dchunk.c", line 2441, in H5D__chunk_lookup
can't query chunk address
File "H5Dbtree.c", line 998, in H5D__btree_idx_get_addr
can't get chunk info
File "H5B.c", line 340, in H5B_find
unable to load B-tree node
File "H5AC.c", line 1262, in H5AC_protect
H5C_protect() failed.
File "H5C.c", line 3574, in H5C_protect
can't load entry
File "H5C.c", line 7954, in H5C_load_entry
unable to load entry
File "H5Bcache.c", line 143, in H5B__load
wrong B-tree signature
End of HDF5 error back trace
Problems reading the array data.
It seems that Pandas/pyTables have troubles when trying to access the same file in a concurrent manner, even if it's only for reading.
Is there a way to be able to make each process read its own chunk concurrently?
Upvotes: 2
Views: 2285
Reputation: 210812
IIUC you can index those columns that are used for filtering data (chunk.some_condition == True
- in your sample code) and then read up only that subset of data that satisfies needed conditions.
In order to be able to do that you need to:
table
format - use parameter: format='table'
data_columns=['col_name1', 'col_name2', etc.]
After that you should be able to filter your data just by reading:
store = pd.HDFStore(filename)
df = store.select('key_name', where="col1 in [11,13] & col2 == 'AAA'")
Upvotes: 2