A. Sarid
A. Sarid

Reputation: 3996

How to groupby and preserve groups order on sorted file

I have a large CSV file that is sorted by few of its columns, let's call these columns sorted_columns.
I want to perform a groupby on these sorted_columns and apply some logic on each one of these groups.

The file does not fit completely into memory so I want to read it in chunks and perform a groupby on each chunk.

The thing I have noticed is that the order of the groups is not preserved even though the file is already sorted by these columns.

Eventually, this is what I am trying to do:

import pandas as pd

def run_logic(key, group):
    # some logic
    pass

last_group = pd.DataFrame()
last_key = None

for chunk_df in df:
    grouped_by_df = chunk_df.groupby(sorted_columns, sort=True)

    for key, group in grouped_by_df:
        if last_key is None or last_key == key:
            last_key = key
            last_group = pd.concat([last_group, group])
        else:  # last_key != key
            run_logic(last_key, last_group)
            last_key = key
            last_group = group.copy()
run_logic(last_key, last_group)

But this does not work because it is not promised by the groupby that the order of the groups is preserved. If the same key exists in two consecutive chunks it is not promised that at the first chunk it will be the last group and at the next chunk it will be the first one. I tried changing the groupby to use sort=False and also tried to change the order of the columns, but it didn't help.

Does anyone have any idea of how to preserve the order of the groups if the keys are already sorted in the original file?

Any other way to read a complete group at once from the file?

Upvotes: 2

Views: 688

Answers (2)

Kristian
Kristian

Reputation: 492

itertools.groupby

Will return the key and an iterator for all the values grouped by that key. If you file is already sorted by your desired key, you are good to go. the groupby function will handle almost everything for you.

From the documentation:

The operation of groupby() is similar to the uniq filter in Unix. It generates a break or new group every time the value of the key function changes (which is why it is usually necessary to have sorted the data using the same key function). That behavior differs from SQL’s GROUP BY which aggregates common elements regardless of their input order.

run_logic is whatever business logic you want to apply to the group of records. This example is just simply counting the number of observations in the iterator.

data_iter simply emits 1 row per CSV. As long as your file is sorted by the desired fields, you do not need to read the entire file into memory.

chunks uses the groupby to group the input iterator using the first 3 fields of the input row. It yields the key and the corresponding iterator of values associated with that key.

#!/usr/bin/env python3

import csv
from itertools import groupby

def run_logic(key, group):
    cntr = 0
    for rec in group:
        cntr = cntr + 1
    return (key, cntr)


def data_iter(filename):
    with open(filename, "r") as fin:
        csvin = csv.reader(fin)
        for row in csvin:
            yield row


def chunks(diter):
    for chunk, iter_ in groupby(diter, key=lambda x: x[0:3]):
        yield (chunk, iter_)


if __name__ == "__main__":
    csviter = data_iter("test.csv")
    chunk_iter = chunks(csviter)
    for chunk, iter_ in chunk_iter:
        print(run_logic(chunk, iter_))

Input data

['1', '1', '1', 'a', 'a', 'a', 'a']  
['1', '1', '1', 'b', 'b', 'b', 'b']  
['1', '1', '1', 'c', 'c', 'c', 'c']  
['1', '1', '1', 'd', 'd', 'd', 'd']  
['1', '1', '1', 'e', 'e', 'e', 'e']  
['2', '1', '1', 'a', 'a', 'a', 'a']  
['2', '1', '1', 'd', 'd', 'd', 'd']  
['2', '1', '1', 'e', 'e', 'e', 'e']  
['2', '1', '1', 'b', 'b', 'b', 'b']  
['2', '1', '1', 'c', 'c', 'c', 'c']  
['3', '1', '1', 'e', 'e', 'e', 'e']  
['3', '1', '1', 'b', 'b', 'b', 'b']  
['3', '1', '1', 'c', 'c', 'c', 'c']  
['3', '1', '1', 'a', 'a', 'a', 'a']  
['3', '1', '1', 'd', 'd', 'd', 'd']

groupby data

Group: ['1', '1', '1']

['1', '1', '1', 'a', 'a', 'a', 'a']
['1', '1', '1', 'b', 'b', 'b', 'b']
['1', '1', '1', 'c', 'c', 'c', 'c']
['1', '1', '1', 'd', 'd', 'd', 'd']
['1', '1', '1', 'e', 'e', 'e', 'e']

Group: ['2', '1', '1']

['2', '1', '1', 'a', 'a', 'a', 'a']
['2', '1', '1', 'd', 'd', 'd', 'd']
['2', '1', '1', 'e', 'e', 'e', 'e']
['2', '1', '1', 'b', 'b', 'b', 'b']
['2', '1', '1', 'c', 'c', 'c', 'c']

Group: ['3', '1', '1']

['3', '1', '1', 'e', 'e', 'e', 'e']
['3', '1', '1', 'b', 'b', 'b', 'b']
['3', '1', '1', 'c', 'c', 'c', 'c']
['3', '1', '1', 'a', 'a', 'a', 'a']
['3', '1', '1', 'd', 'd', 'd', 'd']

Apply business logic

Group: ['1', '1', '1']

(['1', '1', '1'], 5)

Group: ['2', '1', '1']

(['2', '1', '1'], 5)

Group: ['3', '1', '1']

(['3', '1', '1'], 5)

Upvotes: 4

Bernardo stearns reisen
Bernardo stearns reisen

Reputation: 2657

I believe that the essence of your problem is that you're trying to aggregate each group with only one iteration in the dataframe. There's a tradeoff of how many groups you fit in memory versus how many times you need to read the dataframe

NOTE: I purposefully am showing verbose code to pass the idea that it's necessary iterate over the df many times. Both solutions got relatively complex, but still achieving what expected. There are many aspects of the code which can be improved, any help to refactor the code is appreciated

I will use this dummy "data.csv" file to exemplify my solutions. saving the data.csv in the same directory as the script you can just copy and paster the solutions and run them.

sorted1,sorted2,sorted3,othe1,other2,other3,other4 
1, 1, 1, 'a', 'a', 'a', 'a'  
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'  
2, 1, 1, 'a', 'a', 'a', 'a'  
2, 1, 1, 'd', 'd', 'd', 'd'
2, 1, 1, 'd', 'd', 'd', 'a'   
3, 1, 1, 'e', 'e', 'e', 'e'  
3, 1, 1, 'b', 'b', 'b', 'b'  

An initial solution in a scenario where we can store all groups key:

Accumulate all rows of a group first and process after.

essentially I would do: for each iteration in the df(chunks) pick one group(or many if memory allows). Check if it was not processed yet by looking up on a dictionary of processed group keys then accumulate the selected group rows in every chunk by iterating over each chunk. When all chunks iteration is finished process the group data.

import pandas as pd
def run_logic(key, group):
    # some logic
    pass
def accumulate_nextGroup(alreadyProcessed_groups):
    past_accumulated_group = pd.DataFrame()
    pastChunk_groupKey = None
    for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
            groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
            for currentChunk_groupKey, currentChunk_group in groupby_data:
                if (pastChunk_groupKey is None or pastChunk_groupKey == currentChunk_groupKey)\
                        and currentChunk_groupKey not in alreadyProcessed_groups.keys():
                    pastChunk_groupKey = currentChunk_groupKey
                    past_accumulated_group = pd.concat(
                            [past_accumulated_group, currentChunk_group]
                                                      )
                    print(f'I am the choosen group({currentChunk_groupKey}) of the moment in the chunk {chunk_index+1}')
                else: 
                    if currentChunk_groupKey in alreadyProcessed_groups:
                        print(f'group({currentChunk_groupKey}) is  not the choosen group because it was already processed')
                    else:
                        print(f'group({currentChunk_groupKey}) is  not the choosen group({pastChunk_groupKey}) yet :(')
    return pastChunk_groupKey, past_accumulated_group

alreadyProcessed_groups = {}
sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 # 
for iteration_in_df in range(number_of_unique_groups):  
    groupKey, groupData = accumulate_nextGroup(alreadyProcessed_groups)
    run_logic(groupKey, groupData)
    alreadyProcessed_groups[groupKey] = "Already Processed"
    print(alreadyProcessed_groups)
    print(f"end of {iteration_in_df+1} iterations in df")
    print("*"*50)

OUTPUT SOLUTION 1:

I am the choosen group((1, 1, 1)) of the moment in the chunk 1
I am the choosen group((1, 1, 1)) of the moment in the chunk 2
group((2, 1, 1)) is  not the choosen group((1, 1, 1)) yet :(
group((2, 1, 1)) is  not the choosen group((1, 1, 1)) yet :(
group((3, 1, 1)) is  not the choosen group((1, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed'}
end of 1 iterations in df
**************************************************
group((1, 1, 1)) is  not the choosen group because it was already processed
group((1, 1, 1)) is  not the choosen group because it was already processed
I am the choosen group((2, 1, 1)) of the moment in the chunk 2
I am the choosen group((2, 1, 1)) of the moment in the chunk 3
group((3, 1, 1)) is  not the choosen group((2, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed'}
end of 2 iterations in df
**************************************************
group((1, 1, 1)) is  not the choosen group because it was already processed
group((1, 1, 1)) is  not the choosen group because it was already processed
group((2, 1, 1)) is  not the choosen group because it was already processed
group((2, 1, 1)) is  not the choosen group because it was already processed
I am the choosen group((3, 1, 1)) of the moment in the chunk 3
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed', (3, 1, 1): 'Already Processed'}
end of 3 iterations in df
**************************************************

UPDATE SOLUTION 2: in a scenario where we can't store all the group keys in a dictionary :

In the case where we can't store all the group keys in a dictionary, we need to use each group relative index created in each chunk to create a global reference index to each group. (note this solution is much more dense than the previous one)

the main point in this solution is that we don't need the group keys value to identify the groups. More deeply, You can imagine each chunk as a node in a reversed linked list where the first chunk points to null, second chunk points to the first chunk and so on... One iteration on the dataframe corresponds to one traversal in this linked list. For each step(processing a chunk), the only information you need to keep at each time is the previous chunk head, tail and size, and only with this information you can assign to your group keys in any chunk a unique index identifier.

Other important information is that because the file is sorted, the reference index of first element of a chunk will either be the last element of previous chunk of last element + 1. this make it possible to infer global reference index from chunk index.

import pandas as pd
import pysnooper
def run_logic(key, group):
    # some logic
    pass

def generate_currentChunkGroups_globalReferenceIdx(groupby_data,
        currentChunk_index, previousChunk_link):
    if currentChunk_index == 0:
        groupsIn_firstChunk=len(groupby_data.groups.keys())
        currentGroups_globalReferenceIdx = [(i,groupKey) 
                for i,(groupKey,_) in enumerate(groupby_data)]
    else:
        lastChunk_firstGroup, lastChunk_lastGroup, lastChunk_nGroups \
                = previousChunk_link 
        currentChunk_firstGroupKey = list(groupby_data.groups.keys())[0] 
        currentChunk_nGroups = len(groupby_data.groups.keys())

        lastChunk_lastGroupGlobalIdx, lastChunk_lastGroupKey \
                = lastChunk_lastGroup
        if currentChunk_firstGroupKey == lastChunk_lastGroupKey:
            currentChunk_firstGroupGlobalReferenceIdx =  lastChunk_lastGroupGlobalIdx
        else:
            currentChunk_firstGroupGlobalReferenceIdx =  lastChunk_lastGroupGlobalIdx + 1

        currentGroups_globalReferenceIdx = [
                (currentChunk_firstGroupGlobalReferenceIdx+i, groupKey)
                    for (i,groupKey) in enumerate(groupby_data.groups.keys())
                    ]

    next_previousChunk_link = (currentGroups_globalReferenceIdx[0],
            currentGroups_globalReferenceIdx[-1],
            len(currentGroups_globalReferenceIdx)
    )
    return currentGroups_globalReferenceIdx, next_previousChunk_link   

def accumulate_nextGroup(countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator):
    previousChunk_link = None
    currentIdx_beingProcessed = countOf_alreadyProcessedGroups
    for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
        print(f'ITER:{iteration_in_df} CHUNK:{chunk_index} InfoPrevChunk:{previousChunk_link} lastProcessed_chunk:{lastChunk_index}')
        if (lastChunk_index !=  None) and (chunk_index < lastChunk_index):
            groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
            currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
                    = generate_currentChunkGroups_globalReferenceIdx(
                            groupby_data, chunk_index, previousChunk_link
                            )
        elif((lastChunk_index == None) or (chunk_index >= lastChunk_index)):
            if (chunk_index == lastChunk_index):
                groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
                currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
                        = generate_currentChunkGroups_globalReferenceIdx(
                                groupby_data, chunk_index, previousChunk_link
                                )
                currentChunkGroupGlobalIndexes = [GlobalIndex \
                        for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
                if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
                    lastChunk_index = chunk_index
                if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
                    currentGroupKey_beingProcessed = [tup 
                            for tup in currentChunkGroups_globalReferenceIdx
                            if tup[0] == currentIdx_beingProcessed][0][1]
                    currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
                    dataframe_accumulator = pd.concat(
                            [dataframe_accumulator, currentChunk_group]
                                                     )
            else: 
                groupby_data = chunk_df.groupby(sorted_columns, sort=True) 
                currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
                        = generate_currentChunkGroups_globalReferenceIdx(
                                groupby_data, chunk_index, previousChunk_link
                                )
                currentChunkGroupGlobalIndexes = [GlobalIndex \
                        for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
                if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
                    lastChunk_index = chunk_index
                if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
                    currentGroupKey_beingProcessed = [tup 
                            for tup in currentChunkGroups_globalReferenceIdx
                            if tup[0] == currentIdx_beingProcessed][0][1]
                    currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
                    dataframe_accumulator = pd.concat(
                            [dataframe_accumulator, currentChunk_group]
                                                     )
                else:
                    countOf_alreadyProcessedGroups+=1
                    lastChunk_index = chunk_index-1
                    break
        previousChunk_link = next_previousChunk_link
    print(f'Done with chunks for group of global index:{currentIdx_beingProcessed} corresponding to groupKey:{currentGroupKey_beingProcessed}')
    return countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator, currentGroupKey_beingProcessed

sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 # 
lastChunk_index = None 
for iteration_in_df in range(number_of_unique_groups):  
    dataframe_accumulator = pd.DataFrame()
    countOf_alreadyProcessedGroups,lastChunk_index, group_data, currentGroupKey_Processed=\
            accumulate_nextGroup(
                    iteration_in_df, lastChunk_index, dataframe_accumulator
                                )
    run_logic(currentGroupKey_Processed, dataframe_accumulator)
    print(f"end of iteration number {iteration_in_df+1} in the df and processed {currentGroupKey_Processed}")
    print(group_data)
    print("*"*50)

OUTPUT SOLUTION 2:

ITER:0 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:None
ITER:0 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:0
ITER:0 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:0 corresponding to groupKey:(1, 1, 1)
end of iteration number 1 in the df and processed (1, 1, 1)
   sorted1  sorted2  sorted3 othe1 other2 other3 other4 
0        1        1        1   'a'    'a'    'a'   'a'  
1        1        1        1   'a'    'a'    'a'     'a'
2        1        1        1   'a'    'a'    'a'     'a'
3        1        1        1   'a'    'a'    'a'   'a'  
**************************************************
ITER:1 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:1
ITER:1 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:1
ITER:1 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:1 corresponding to groupKey:(2, 1, 1)
end of iteration number 2 in the df and processed (2, 1, 1)
   sorted1  sorted2  sorted3 othe1 other2 other3  other4 
4        2        1        1   'a'    'a'    'a'    'a'  
5        2        1        1   'd'    'd'    'd'   'd'   
6        2        1        1   'd'    'd'    'd'   'a'   
**************************************************
ITER:2 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:2
ITER:2 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:2
ITER:2 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:2
Done with chunks for group of global index:2 corresponding to groupKey:(3, 1, 1)
end of iteration number 3 in the df and processed (3, 1, 1)
   sorted1  sorted2  sorted3 othe1 other2 other3 other4 
7        3        1        1   'e'    'e'    'e'   'e'  
8        3        1        1   'b'    'b'    'b'    'b' 
**************************************************

Upvotes: 7

Related Questions