The Unfun Cat
The Unfun Cat

Reputation: 31948

Groupby on chunks might lead to groups split between chunks

I have some humongous files I need to read in chunks due to their size.

I want to do a groupby and then a function on these files.

Problem is, if the chunksize is 50 000 and if a group exists at rows 49998-50002, this group will be split into two; one group in the first chunk, another group in the second chunk. Is there a way to solve this problem of a group existing between chunks?

All solutions I could come up with feel rather un-Pandaish so perhaps I should just solve this problem by reading the table in two passes.

Upvotes: 3

Views: 1692

Answers (1)

unutbu
unutbu

Reputation: 880259

I know of no out-of-the-box function for this, but here is how you could roll your own:

remainder = pd.DataFrame()
for filename in filenames:                                            # 1
    for chunk in pd.read_csv(filename, index_col=[0], chunksize=300): # 2 
        grouped = chunk.groupby(['group'])
        for grp, nextgrp in iwindow(grouped, 2):                      # 3
            group_num, df = grp                                       # 4
            if nextgrp is None:
                # When nextgrp is None, grp is the last group
                remainder = pd.concat([remainder, df])                # 5
                break                                                 # 6
            if len(remainder):                                        # 7
                df = pd.concat([remainder, df])
                remainder = pd.DataFrame()
            print(filename)
            process(df)                                               # 8
if len(remainder):                                                    # 9         
    process(remainder)
  1. Obviously, we need to iterate over every file
  2. Read the file in chunks. chunksize=300 tells read_csv to read the file in chunks of 300 bytes. I made it small for the example below. You can increase this to read more of the file per iteration.
  3. iwindow is a sliding window utility function. It returns the items in grouped two at a time. For example,

    In [117]: list(iwindow([1,2,3], 2))
    Out[117]: [(1, 2), (2, 3), (3, None)]
    
  4. df is a DataFrame with a constant group value (equal to group_num).

  5. Don't process the last DataFrame because it might be a partial DataFrame with more coming in the next chunk. Save it in remainder.
  6. Break out of the inner loop. Go on to the next chunk (if any).
  7. If remainder holds some unprocessed DataFrame, prepend it to df
  8. Finally, process df
  9. remainder might hold one last unprocessed DataFrame. So process it now.

The general idea being employed here is useful whenever you need to read a file in chunks but process the chunks according to some other delimiter. Essentially the same idea is used here to break a file into chunks delimited by a regex pattern.


For example,

import itertools as IT
import numpy as np
import pandas as pd

def make_data(groupsize, ngroups, filenames):
    nfiles = len(filenames)
    group_num = np.repeat(np.arange(ngroups), groupsize) 
    arr = np.random.randint(10, size=(len(group_num), 2))
    arr = np.column_stack([group_num, arr])
    for arri, filename in zip(np.array_split(arr, nfiles), filenames):
        df = pd.DataFrame(arri, columns=['group','A','B'])
        df.to_csv(filename) 

def iwindow(iterable, n=2, fillvalue=None):
    """
    Returns a sliding window (of width n) over data from the sequence.
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    iterables = IT.tee(iterable, n)
    iterables = (IT.islice(it, pos, None) for pos, it in enumerate(iterables))
    for result in IT.izip_longest(*iterables, fillvalue=None):
        yield result

def process(df):
    print(df)
    print('-'*80)

filenames = ['/tmp/data-{}.csv'.format(i) for i in range(3)]
make_data(groupsize=40, ngroups=5, filenames=filenames)

remainder = pd.DataFrame()
for filename in filenames:
    for chunk in pd.read_csv(filename, index_col=[0], chunksize=300):
        grouped = chunk.groupby(['group'])
        for grp, nextgrp in iwindow(grouped, 2):
            group_num, df = grp
            if nextgrp is None:
                # When nextgrp is None, grp is the last group
                remainder = pd.concat([remainder, df])
                break
            if len(remainder):
                df = pd.concat([remainder, df])
                remainder = pd.DataFrame()
            print(filename)
            process(df)
if len(remainder):
    process(remainder)

Upvotes: 2

Related Questions