Jivan
Jivan

Reputation: 23038

Concurrently read an HDF5 file in Pandas

I have a data.h5 file organised in multiple chunks, the entire file being several hundred gigabytes. I need to work with a filtered subset of the file in memory, in the form of a Pandas DataFrame.

The goal of the following routine is to distribute the filtering work across several processes, then concatenate the filtered results into the final DataFrame.

Since reading from the file takes a significant amount of time, I'm trying to make each process read its own chunk in a concurrent manner as well.

import multiprocessing as mp, pandas as pd

store = pd.HDFStore('data.h5')
min_dset, max_dset = 0, len(store.keys()) - 1
dset_list = list(range(min_dset, max_dset))

frames = []

def read_and_return_subset(dset):
    # each process is intended to read its own chunk in a concurrent manner
    chunk = store.select('batch_{:03}'.format(dset))

    # and then process the chunk, do the filtering, and return the result
    output = chunk[chunk.some_condition == True]
    return output


with mp.Pool(processes=32) as pool:
    for frame in pool.map(read_and_return_subset, dset_list):
        frames.append(frame)

df = pd.concat(frames)

However, the above code triggers this error:

HDF5ExtError                              Traceback (most recent call last)
<ipython-input-174-867671c5a58f> in <module>()
     53 
     54     with mp.Pool(processes=32) as pool:
---> 55         for frame in pool.map(read_and_return_subset, dset_list):
     56             frames.append(frame)
     57 

/usr/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):

HDF5ExtError: HDF5 error back trace

  File "H5Dio.c", line 173, in H5Dread
    can't read data
  File "H5Dio.c", line 554, in H5D__read
    can't read data
  File "H5Dchunk.c", line 1856, in H5D__chunk_read
    error looking up chunk address
  File "H5Dchunk.c", line 2441, in H5D__chunk_lookup
    can't query chunk address
  File "H5Dbtree.c", line 998, in H5D__btree_idx_get_addr
    can't get chunk info
  File "H5B.c", line 340, in H5B_find
    unable to load B-tree node
  File "H5AC.c", line 1262, in H5AC_protect
    H5C_protect() failed.
  File "H5C.c", line 3574, in H5C_protect
    can't load entry
  File "H5C.c", line 7954, in H5C_load_entry
    unable to load entry
  File "H5Bcache.c", line 143, in H5B__load
    wrong B-tree signature

End of HDF5 error back trace

Problems reading the array data.

It seems that Pandas/pyTables have troubles when trying to access the same file in a concurrent manner, even if it's only for reading.

Is there a way to be able to make each process read its own chunk concurrently?

Upvotes: 2

Views: 2285

Answers (1)

MaxU - stand with Ukraine
MaxU - stand with Ukraine

Reputation: 210812

IIUC you can index those columns that are used for filtering data (chunk.some_condition == True - in your sample code) and then read up only that subset of data that satisfies needed conditions.

In order to be able to do that you need to:

  1. save HDF5 file in table format - use parameter: format='table'
  2. index columns, that will be used for filtering - use parameter: data_columns=['col_name1', 'col_name2', etc.]

After that you should be able to filter your data just by reading:

store = pd.HDFStore(filename)
df = store.select('key_name', where="col1 in [11,13] & col2 == 'AAA'")

Upvotes: 2

Related Questions