user1707505
user1707505

Reputation:

What can I do to make this binary file reader faster?

What I currently have is a pretty naive implementation that chunks the data and processes it. It runs in around 34 seconds for a file of 816mb, but I want it to be faster than that. I've already profiled it to see what bits take the most time, but most of what takes an appreciable amount of time is centered around python module functions. As a result, I'm stuck as to what I can do to improve performance. Any and all help would be very welcome. I've included the profile and relevant code below.

Sun May  5 02:10:28 2013    chunking.prof

         50868044 function calls in 42.901 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   358204   13.791    0.000   30.722    0.000 reader_variants.py:361(_unpack_from)
  7164080    7.331    0.000   10.812    0.000 reader_variants.py:116(_null_terminate)
 10029712    4.762    0.000    4.762    0.000 reader_variants.py:210(_missing_values_mod)
        1    4.117    4.117    4.117    4.117 {numpy.core.multiarray.array}
   716407    3.751    0.000    5.927    0.000 {map}
 17193696    2.176    0.000    2.176    0.000 reader_variants.py:358(<lambda>)
  7164080    1.906    0.000    1.906    0.000 {method 'lstrip' of 'str' objects}
  7164080    1.574    0.000    1.574    0.000 {method 'index' of 'str' objects}
   358204    1.204    0.000   37.672    0.000 reader_variants.py:353(parse_records)
   358204    1.135    0.000    1.135    0.000 {_struct.unpack_from}
        1    0.417    0.417   42.901   42.901 <string>:1(<module>)
      779    0.349    0.000   38.021    0.049 reader_variants.py:349(process_chunk)
      779    0.330    0.000    0.330    0.000 {method 'read' of 'file' objects}
   358983    0.041    0.000    0.041    0.000 {len}
        1    0.009    0.009   42.484   42.484 reader_variants.py:306(genfromdta_cc)
      779    0.006    0.000    0.006    0.000 {method 'extend' of 'list' objects}
       48    0.000    0.000    0.000    0.000 reader_variants.py:324(<lambda>)
        1    0.000    0.000    4.117    4.117 numeric.py:256(asarray)
        1    0.000    0.000    0.000    0.000 {method 'seek' of 'file' objects}
        1    0.000    0.000    0.000    0.000 {sum}
        1    0.000    0.000    0.000    0.000 {method 'join' of 'str' objects}
        1    0.000    0.000    0.000    0.000 {zip}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}




def genfromdta_cc(self, missing_flt=-999., encoding=None, pandas=False,
                    convert_dates=True, size=1024*1024): # default chunk size 1mb
        """
        reads stata data by chunking the file
        """
        try:
            self._file.seek(self._data_location)
        except Exception:
            pass

        nobs = self._header['nobs']
        varnames = self._header['varlist']
        typlist = self._header['typlist']
        types = self._header['dtyplist']

        dt = np.dtype(zip(varnames, types))
        data=[]

        fmt = ''.join(map(lambda x: str(x)+'s' if type(x) is int else x, typlist))
        record_size = sum(self._col_sizes)

        maxrecords = size/record_size # max number of records we can fit in size

        if maxrecords > nobs: # if the file is smaller than the ideal chunk size
            chunk_size = nobs*record_size # read the entire file in
        else:   
            chunk_size = maxrecords * record_size
            chunk_size_leftover = (nobs*record_size)%chunk_size

        numchunks = nobs / maxrecords # number of chunks
        numchunks_leftover = nobs % maxrecords #number of records left over

        for i in xrange(numchunks):
            chunk = self._file.read(chunk_size)
            data.extend(self.process_chunk(chunk, fmt, record_size, missing_flt))

        # last chunk contains less than max number of records
        if numchunks_leftover > 0:
            chunk = self._file.read(chunk_size_leftover)
            data.extend(self.process_chunk(chunk, fmt, record_size, missing_flt))

        return np.asarray(data, dtype=dt) # return data as numpy array

def process_chunk(self, chunk, fmt, record_size, missing_flt):      
        iternum = len(chunk)/record_size #number of records to read
        return [self.parse_records(chunk, fmt, record_size, missing_flt, i) for i in xrange(iternum)]

def parse_records(self, chunk, fmt, record_size, missing_flt, offset):
        # create record
        record = self._unpack_from(fmt, chunk, record_size*offset)
        # check to see if None in record
        if None in record:
            record = map(lambda x: missing_flt if x is None else x, record)
        return tuple(record)

def _unpack_from(self, fmt, byt, offset):
        typlist = self._header['typlist']
        d = map(None, unpack_from(self._header['byteorder']+fmt, byt, offset))
        d = [self._null_terminate(d[i], self._encoding) if type(typlist[i]) is int else self._missing_values_mod(d[i], typlist[i]) for i in xrange(len(d))]         
        return d

Upvotes: 0

Views: 152

Answers (1)

Roland Smith
Roland Smith

Reputation: 43495

One approach you could take is to just read the whole file into memory. Assuming you have several GB of RAM (not uncommon even on a PC several years old), 816 MB should fit into RAM. In that case you can do away with the chunking;

import struct

with open('datafile.bin', 'r') as df:
    rawdata = df.read()

fmt = '17s244s244s53sh203s68sh14sff203s192s192s192s192s192s22sffffff23s36sffffff12s11s23s21sdhhfdfdfdfdf'
recsz = struct.calcsize(fmt)

results = []
for offset in xrange(0, len(rawdata)/recsize):
    results.append(struct.unpack_from(rawdata, fmt, offset))

From your code it looks like the records have a constant size? So even if you don't want to read the whole file into memory, you could read the file in record-size pieces;

import struct

fmt = '17s244s244s53sh203s68sh14sff203s192s192s192s192s192s22sffffff23s36sffffff12s11s23s21sdhhfdfdfdfdf'
recsz = struct.calcsize(fmt)

results = []
with open('datafile.bin', 'r') as df:
    s = df.read(recsz)
    results.append(struct.unpack(fmt, s))

This approach can also be spread over all your cores using multiprocessing.Pool.map(). So if you have n cores, you can have n processes reading and unpacking records. At best this can reduce the time you need to 1/n. (In reality is will take some more time because the records have to be pickled and sent back to the master process.)

(N.B.: It would help is you could show us the kind of data you're reading, e.g. the format string you use.)

Upvotes: 1

Related Questions