Reputation: 31948
I have some humongous files I need to read in chunks due to their size.
I want to do a groupby and then a function on these files.
Problem is, if the chunksize is 50 000
and if a group exists at rows 49998-50002
, this group will be split into two; one group in the first chunk, another group in the second chunk. Is there a way to solve this problem of a group existing between chunks?
All solutions I could come up with feel rather un-Pandaish
so perhaps I should just solve this problem by reading the table in two passes.
Upvotes: 3
Views: 1692
Reputation: 880259
I know of no out-of-the-box function for this, but here is how you could roll your own:
remainder = pd.DataFrame()
for filename in filenames: # 1
for chunk in pd.read_csv(filename, index_col=[0], chunksize=300): # 2
grouped = chunk.groupby(['group'])
for grp, nextgrp in iwindow(grouped, 2): # 3
group_num, df = grp # 4
if nextgrp is None:
# When nextgrp is None, grp is the last group
remainder = pd.concat([remainder, df]) # 5
break # 6
if len(remainder): # 7
df = pd.concat([remainder, df])
remainder = pd.DataFrame()
print(filename)
process(df) # 8
if len(remainder): # 9
process(remainder)
chunksize=300
tells read_csv
to read the file in chunks of 300 bytes. I made it small for the example below. You can increase this to read more of the file per iteration. iwindow
is a sliding window utility function. It returns the items in grouped
two at a time. For example,
In [117]: list(iwindow([1,2,3], 2))
Out[117]: [(1, 2), (2, 3), (3, None)]
df
is a DataFrame with a constant group
value (equal to group_num
).
remainder
.remainder
holds some unprocessed DataFrame, prepend it to df
df
remainder
might hold one last unprocessed DataFrame. So process it now.The general idea being employed here is useful whenever you need to read a file in chunks but process the chunks according to some other delimiter. Essentially the same idea is used here to break a file into chunks delimited by a regex pattern.
For example,
import itertools as IT
import numpy as np
import pandas as pd
def make_data(groupsize, ngroups, filenames):
nfiles = len(filenames)
group_num = np.repeat(np.arange(ngroups), groupsize)
arr = np.random.randint(10, size=(len(group_num), 2))
arr = np.column_stack([group_num, arr])
for arri, filename in zip(np.array_split(arr, nfiles), filenames):
df = pd.DataFrame(arri, columns=['group','A','B'])
df.to_csv(filename)
def iwindow(iterable, n=2, fillvalue=None):
"""
Returns a sliding window (of width n) over data from the sequence.
s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
"""
iterables = IT.tee(iterable, n)
iterables = (IT.islice(it, pos, None) for pos, it in enumerate(iterables))
for result in IT.izip_longest(*iterables, fillvalue=None):
yield result
def process(df):
print(df)
print('-'*80)
filenames = ['/tmp/data-{}.csv'.format(i) for i in range(3)]
make_data(groupsize=40, ngroups=5, filenames=filenames)
remainder = pd.DataFrame()
for filename in filenames:
for chunk in pd.read_csv(filename, index_col=[0], chunksize=300):
grouped = chunk.groupby(['group'])
for grp, nextgrp in iwindow(grouped, 2):
group_num, df = grp
if nextgrp is None:
# When nextgrp is None, grp is the last group
remainder = pd.concat([remainder, df])
break
if len(remainder):
df = pd.concat([remainder, df])
remainder = pd.DataFrame()
print(filename)
process(df)
if len(remainder):
process(remainder)
Upvotes: 2