Reputation:
What I currently have is a pretty naive implementation that chunks the data and processes it. It runs in around 34 seconds for a file of 816mb, but I want it to be faster than that. I've already profiled it to see what bits take the most time, but most of what takes an appreciable amount of time is centered around python module functions. As a result, I'm stuck as to what I can do to improve performance. Any and all help would be very welcome. I've included the profile and relevant code below.
Sun May 5 02:10:28 2013 chunking.prof
50868044 function calls in 42.901 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
358204 13.791 0.000 30.722 0.000 reader_variants.py:361(_unpack_from)
7164080 7.331 0.000 10.812 0.000 reader_variants.py:116(_null_terminate)
10029712 4.762 0.000 4.762 0.000 reader_variants.py:210(_missing_values_mod)
1 4.117 4.117 4.117 4.117 {numpy.core.multiarray.array}
716407 3.751 0.000 5.927 0.000 {map}
17193696 2.176 0.000 2.176 0.000 reader_variants.py:358(<lambda>)
7164080 1.906 0.000 1.906 0.000 {method 'lstrip' of 'str' objects}
7164080 1.574 0.000 1.574 0.000 {method 'index' of 'str' objects}
358204 1.204 0.000 37.672 0.000 reader_variants.py:353(parse_records)
358204 1.135 0.000 1.135 0.000 {_struct.unpack_from}
1 0.417 0.417 42.901 42.901 <string>:1(<module>)
779 0.349 0.000 38.021 0.049 reader_variants.py:349(process_chunk)
779 0.330 0.000 0.330 0.000 {method 'read' of 'file' objects}
358983 0.041 0.000 0.041 0.000 {len}
1 0.009 0.009 42.484 42.484 reader_variants.py:306(genfromdta_cc)
779 0.006 0.000 0.006 0.000 {method 'extend' of 'list' objects}
48 0.000 0.000 0.000 0.000 reader_variants.py:324(<lambda>)
1 0.000 0.000 4.117 4.117 numeric.py:256(asarray)
1 0.000 0.000 0.000 0.000 {method 'seek' of 'file' objects}
1 0.000 0.000 0.000 0.000 {sum}
1 0.000 0.000 0.000 0.000 {method 'join' of 'str' objects}
1 0.000 0.000 0.000 0.000 {zip}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
def genfromdta_cc(self, missing_flt=-999., encoding=None, pandas=False,
convert_dates=True, size=1024*1024): # default chunk size 1mb
"""
reads stata data by chunking the file
"""
try:
self._file.seek(self._data_location)
except Exception:
pass
nobs = self._header['nobs']
varnames = self._header['varlist']
typlist = self._header['typlist']
types = self._header['dtyplist']
dt = np.dtype(zip(varnames, types))
data=[]
fmt = ''.join(map(lambda x: str(x)+'s' if type(x) is int else x, typlist))
record_size = sum(self._col_sizes)
maxrecords = size/record_size # max number of records we can fit in size
if maxrecords > nobs: # if the file is smaller than the ideal chunk size
chunk_size = nobs*record_size # read the entire file in
else:
chunk_size = maxrecords * record_size
chunk_size_leftover = (nobs*record_size)%chunk_size
numchunks = nobs / maxrecords # number of chunks
numchunks_leftover = nobs % maxrecords #number of records left over
for i in xrange(numchunks):
chunk = self._file.read(chunk_size)
data.extend(self.process_chunk(chunk, fmt, record_size, missing_flt))
# last chunk contains less than max number of records
if numchunks_leftover > 0:
chunk = self._file.read(chunk_size_leftover)
data.extend(self.process_chunk(chunk, fmt, record_size, missing_flt))
return np.asarray(data, dtype=dt) # return data as numpy array
def process_chunk(self, chunk, fmt, record_size, missing_flt):
iternum = len(chunk)/record_size #number of records to read
return [self.parse_records(chunk, fmt, record_size, missing_flt, i) for i in xrange(iternum)]
def parse_records(self, chunk, fmt, record_size, missing_flt, offset):
# create record
record = self._unpack_from(fmt, chunk, record_size*offset)
# check to see if None in record
if None in record:
record = map(lambda x: missing_flt if x is None else x, record)
return tuple(record)
def _unpack_from(self, fmt, byt, offset):
typlist = self._header['typlist']
d = map(None, unpack_from(self._header['byteorder']+fmt, byt, offset))
d = [self._null_terminate(d[i], self._encoding) if type(typlist[i]) is int else self._missing_values_mod(d[i], typlist[i]) for i in xrange(len(d))]
return d
Upvotes: 0
Views: 152
Reputation: 43495
One approach you could take is to just read the whole file into memory. Assuming you have several GB of RAM (not uncommon even on a PC several years old), 816 MB should fit into RAM. In that case you can do away with the chunking;
import struct
with open('datafile.bin', 'r') as df:
rawdata = df.read()
fmt = '17s244s244s53sh203s68sh14sff203s192s192s192s192s192s22sffffff23s36sffffff12s11s23s21sdhhfdfdfdfdf'
recsz = struct.calcsize(fmt)
results = []
for offset in xrange(0, len(rawdata)/recsize):
results.append(struct.unpack_from(rawdata, fmt, offset))
From your code it looks like the records have a constant size? So even if you don't want to read the whole file into memory, you could read the file in record-size pieces;
import struct
fmt = '17s244s244s53sh203s68sh14sff203s192s192s192s192s192s22sffffff23s36sffffff12s11s23s21sdhhfdfdfdfdf'
recsz = struct.calcsize(fmt)
results = []
with open('datafile.bin', 'r') as df:
s = df.read(recsz)
results.append(struct.unpack(fmt, s))
This approach can also be spread over all your cores using multiprocessing.Pool.map()
. So if you have n cores, you can have n processes reading and unpacking records. At best this can reduce the time you need to 1/n. (In reality is will take some more time because the records have to be pickled and sent back to the master process.)
(N.B.: It would help is you could show us the kind of data you're reading, e.g. the format string you use.)
Upvotes: 1