Reputation: 3996
I have a large CSV file that is sorted by few of its columns, let's call these columns sorted_columns
.
I want to perform a groupby on these sorted_columns
and apply some logic on each one of these groups.
The file does not fit completely into memory so I want to read it in chunks and perform a groupby
on each chunk.
The thing I have noticed is that the order of the groups is not preserved even though the file is already sorted by these columns.
Eventually, this is what I am trying to do:
import pandas as pd
def run_logic(key, group):
# some logic
pass
last_group = pd.DataFrame()
last_key = None
for chunk_df in df:
grouped_by_df = chunk_df.groupby(sorted_columns, sort=True)
for key, group in grouped_by_df:
if last_key is None or last_key == key:
last_key = key
last_group = pd.concat([last_group, group])
else: # last_key != key
run_logic(last_key, last_group)
last_key = key
last_group = group.copy()
run_logic(last_key, last_group)
But this does not work because it is not promised by the groupby
that the order of the groups is preserved. If the same key
exists in two consecutive chunks it is not promised that at the first chunk it will be the last group and at the next chunk it will be the first one.
I tried changing the groupby
to use sort=False
and also tried to change the order of the columns, but it didn't help.
Does anyone have any idea of how to preserve the order of the groups if the keys are already sorted in the original file?
Any other way to read a complete group at once from the file?
Upvotes: 2
Views: 688
Reputation: 492
Will return the key and an iterator for all the values grouped by that key. If you file is already sorted by your desired key, you are good to go. the groupby function will handle almost everything for you.
From the documentation:
The operation of
groupby()
is similar to theuniq
filter in Unix. It generates a break or new group every time the value of the key function changes (which is why it is usually necessary to have sorted the data using the same key function). That behavior differs from SQL’s GROUP BY which aggregates common elements regardless of their input order.
run_logic is whatever business logic you want to apply to the group of records. This example is just simply counting the number of observations in the iterator.
data_iter simply emits 1 row per CSV. As long as your file is sorted by the desired fields, you do not need to read the entire file into memory.
chunks uses the groupby to group the input iterator using the first 3 fields of the input row. It yields the key and the corresponding iterator of values associated with that key.
#!/usr/bin/env python3
import csv
from itertools import groupby
def run_logic(key, group):
cntr = 0
for rec in group:
cntr = cntr + 1
return (key, cntr)
def data_iter(filename):
with open(filename, "r") as fin:
csvin = csv.reader(fin)
for row in csvin:
yield row
def chunks(diter):
for chunk, iter_ in groupby(diter, key=lambda x: x[0:3]):
yield (chunk, iter_)
if __name__ == "__main__":
csviter = data_iter("test.csv")
chunk_iter = chunks(csviter)
for chunk, iter_ in chunk_iter:
print(run_logic(chunk, iter_))
['1', '1', '1', 'a', 'a', 'a', 'a']
['1', '1', '1', 'b', 'b', 'b', 'b']
['1', '1', '1', 'c', 'c', 'c', 'c']
['1', '1', '1', 'd', 'd', 'd', 'd']
['1', '1', '1', 'e', 'e', 'e', 'e']
['2', '1', '1', 'a', 'a', 'a', 'a']
['2', '1', '1', 'd', 'd', 'd', 'd']
['2', '1', '1', 'e', 'e', 'e', 'e']
['2', '1', '1', 'b', 'b', 'b', 'b']
['2', '1', '1', 'c', 'c', 'c', 'c']
['3', '1', '1', 'e', 'e', 'e', 'e']
['3', '1', '1', 'b', 'b', 'b', 'b']
['3', '1', '1', 'c', 'c', 'c', 'c']
['3', '1', '1', 'a', 'a', 'a', 'a']
['3', '1', '1', 'd', 'd', 'd', 'd']
Group: ['1', '1', '1']
['1', '1', '1', 'a', 'a', 'a', 'a']
['1', '1', '1', 'b', 'b', 'b', 'b']
['1', '1', '1', 'c', 'c', 'c', 'c']
['1', '1', '1', 'd', 'd', 'd', 'd']
['1', '1', '1', 'e', 'e', 'e', 'e']
Group: ['2', '1', '1']
['2', '1', '1', 'a', 'a', 'a', 'a']
['2', '1', '1', 'd', 'd', 'd', 'd']
['2', '1', '1', 'e', 'e', 'e', 'e']
['2', '1', '1', 'b', 'b', 'b', 'b']
['2', '1', '1', 'c', 'c', 'c', 'c']
Group: ['3', '1', '1']
['3', '1', '1', 'e', 'e', 'e', 'e']
['3', '1', '1', 'b', 'b', 'b', 'b']
['3', '1', '1', 'c', 'c', 'c', 'c']
['3', '1', '1', 'a', 'a', 'a', 'a']
['3', '1', '1', 'd', 'd', 'd', 'd']
Group: ['1', '1', '1']
(['1', '1', '1'], 5)
Group: ['2', '1', '1']
(['2', '1', '1'], 5)
Group: ['3', '1', '1']
(['3', '1', '1'], 5)
Upvotes: 4
Reputation: 2657
I believe that the essence of your problem is that you're trying to aggregate each group with only one iteration in the dataframe. There's a tradeoff of how many groups you fit in memory versus how many times you need to read the dataframe
NOTE: I purposefully am showing verbose code to pass the idea that it's necessary iterate over the df many times. Both solutions got relatively complex, but still achieving what expected. There are many aspects of the code which can be improved, any help to refactor the code is appreciated
I will use this dummy "data.csv" file to exemplify my solutions. saving the data.csv in the same directory as the script you can just copy and paster the solutions and run them.
sorted1,sorted2,sorted3,othe1,other2,other3,other4
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
1, 1, 1, 'a', 'a', 'a', 'a'
2, 1, 1, 'a', 'a', 'a', 'a'
2, 1, 1, 'd', 'd', 'd', 'd'
2, 1, 1, 'd', 'd', 'd', 'a'
3, 1, 1, 'e', 'e', 'e', 'e'
3, 1, 1, 'b', 'b', 'b', 'b'
An initial solution in a scenario where we can store all groups key:
Accumulate all rows of a group first and process after.
essentially I would do: for each iteration in the df(chunks) pick one group(or many if memory allows). Check if it was not processed yet by looking up on a dictionary of processed group keys then accumulate the selected group rows in every chunk by iterating over each chunk. When all chunks iteration is finished process the group data.
import pandas as pd
def run_logic(key, group):
# some logic
pass
def accumulate_nextGroup(alreadyProcessed_groups):
past_accumulated_group = pd.DataFrame()
pastChunk_groupKey = None
for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
for currentChunk_groupKey, currentChunk_group in groupby_data:
if (pastChunk_groupKey is None or pastChunk_groupKey == currentChunk_groupKey)\
and currentChunk_groupKey not in alreadyProcessed_groups.keys():
pastChunk_groupKey = currentChunk_groupKey
past_accumulated_group = pd.concat(
[past_accumulated_group, currentChunk_group]
)
print(f'I am the choosen group({currentChunk_groupKey}) of the moment in the chunk {chunk_index+1}')
else:
if currentChunk_groupKey in alreadyProcessed_groups:
print(f'group({currentChunk_groupKey}) is not the choosen group because it was already processed')
else:
print(f'group({currentChunk_groupKey}) is not the choosen group({pastChunk_groupKey}) yet :(')
return pastChunk_groupKey, past_accumulated_group
alreadyProcessed_groups = {}
sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 #
for iteration_in_df in range(number_of_unique_groups):
groupKey, groupData = accumulate_nextGroup(alreadyProcessed_groups)
run_logic(groupKey, groupData)
alreadyProcessed_groups[groupKey] = "Already Processed"
print(alreadyProcessed_groups)
print(f"end of {iteration_in_df+1} iterations in df")
print("*"*50)
OUTPUT SOLUTION 1:
I am the choosen group((1, 1, 1)) of the moment in the chunk 1
I am the choosen group((1, 1, 1)) of the moment in the chunk 2
group((2, 1, 1)) is not the choosen group((1, 1, 1)) yet :(
group((2, 1, 1)) is not the choosen group((1, 1, 1)) yet :(
group((3, 1, 1)) is not the choosen group((1, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed'}
end of 1 iterations in df
**************************************************
group((1, 1, 1)) is not the choosen group because it was already processed
group((1, 1, 1)) is not the choosen group because it was already processed
I am the choosen group((2, 1, 1)) of the moment in the chunk 2
I am the choosen group((2, 1, 1)) of the moment in the chunk 3
group((3, 1, 1)) is not the choosen group((2, 1, 1)) yet :(
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed'}
end of 2 iterations in df
**************************************************
group((1, 1, 1)) is not the choosen group because it was already processed
group((1, 1, 1)) is not the choosen group because it was already processed
group((2, 1, 1)) is not the choosen group because it was already processed
group((2, 1, 1)) is not the choosen group because it was already processed
I am the choosen group((3, 1, 1)) of the moment in the chunk 3
{(1, 1, 1): 'Already Processed', (2, 1, 1): 'Already Processed', (3, 1, 1): 'Already Processed'}
end of 3 iterations in df
**************************************************
UPDATE SOLUTION 2: in a scenario where we can't store all the group keys in a dictionary :
In the case where we can't store all the group keys in a dictionary, we need to use each group relative index created in each chunk to create a global reference index to each group. (note this solution is much more dense than the previous one)
the main point in this solution is that we don't need the group keys value to identify the groups. More deeply, You can imagine each chunk as a node in a reversed linked list where the first chunk points to null, second chunk points to the first chunk and so on... One iteration on the dataframe corresponds to one traversal in this linked list. For each step(processing a chunk), the only information you need to keep at each time is the previous chunk head, tail and size, and only with this information you can assign to your group keys in any chunk a unique index identifier.
Other important information is that because the file is sorted, the reference index of first element of a chunk will either be the last element of previous chunk of last element + 1. this make it possible to infer global reference index from chunk index.
import pandas as pd
import pysnooper
def run_logic(key, group):
# some logic
pass
def generate_currentChunkGroups_globalReferenceIdx(groupby_data,
currentChunk_index, previousChunk_link):
if currentChunk_index == 0:
groupsIn_firstChunk=len(groupby_data.groups.keys())
currentGroups_globalReferenceIdx = [(i,groupKey)
for i,(groupKey,_) in enumerate(groupby_data)]
else:
lastChunk_firstGroup, lastChunk_lastGroup, lastChunk_nGroups \
= previousChunk_link
currentChunk_firstGroupKey = list(groupby_data.groups.keys())[0]
currentChunk_nGroups = len(groupby_data.groups.keys())
lastChunk_lastGroupGlobalIdx, lastChunk_lastGroupKey \
= lastChunk_lastGroup
if currentChunk_firstGroupKey == lastChunk_lastGroupKey:
currentChunk_firstGroupGlobalReferenceIdx = lastChunk_lastGroupGlobalIdx
else:
currentChunk_firstGroupGlobalReferenceIdx = lastChunk_lastGroupGlobalIdx + 1
currentGroups_globalReferenceIdx = [
(currentChunk_firstGroupGlobalReferenceIdx+i, groupKey)
for (i,groupKey) in enumerate(groupby_data.groups.keys())
]
next_previousChunk_link = (currentGroups_globalReferenceIdx[0],
currentGroups_globalReferenceIdx[-1],
len(currentGroups_globalReferenceIdx)
)
return currentGroups_globalReferenceIdx, next_previousChunk_link
def accumulate_nextGroup(countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator):
previousChunk_link = None
currentIdx_beingProcessed = countOf_alreadyProcessedGroups
for chunk_index, chunk_df in enumerate(pd.read_csv("data.csv",iterator=True, chunksize=3)):
print(f'ITER:{iteration_in_df} CHUNK:{chunk_index} InfoPrevChunk:{previousChunk_link} lastProcessed_chunk:{lastChunk_index}')
if (lastChunk_index != None) and (chunk_index < lastChunk_index):
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
= generate_currentChunkGroups_globalReferenceIdx(
groupby_data, chunk_index, previousChunk_link
)
elif((lastChunk_index == None) or (chunk_index >= lastChunk_index)):
if (chunk_index == lastChunk_index):
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
= generate_currentChunkGroups_globalReferenceIdx(
groupby_data, chunk_index, previousChunk_link
)
currentChunkGroupGlobalIndexes = [GlobalIndex \
for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
lastChunk_index = chunk_index
if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
currentGroupKey_beingProcessed = [tup
for tup in currentChunkGroups_globalReferenceIdx
if tup[0] == currentIdx_beingProcessed][0][1]
currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
dataframe_accumulator = pd.concat(
[dataframe_accumulator, currentChunk_group]
)
else:
groupby_data = chunk_df.groupby(sorted_columns, sort=True)
currentChunkGroups_globalReferenceIdx, next_previousChunk_link \
= generate_currentChunkGroups_globalReferenceIdx(
groupby_data, chunk_index, previousChunk_link
)
currentChunkGroupGlobalIndexes = [GlobalIndex \
for (GlobalIndex,_) in currentChunkGroups_globalReferenceIdx]
if((lastChunk_index is None) or (lastChunk_index <= chunk_index)):
lastChunk_index = chunk_index
if currentIdx_beingProcessed in currentChunkGroupGlobalIndexes:
currentGroupKey_beingProcessed = [tup
for tup in currentChunkGroups_globalReferenceIdx
if tup[0] == currentIdx_beingProcessed][0][1]
currentChunk_group = groupby_data.get_group(currentGroupKey_beingProcessed)
dataframe_accumulator = pd.concat(
[dataframe_accumulator, currentChunk_group]
)
else:
countOf_alreadyProcessedGroups+=1
lastChunk_index = chunk_index-1
break
previousChunk_link = next_previousChunk_link
print(f'Done with chunks for group of global index:{currentIdx_beingProcessed} corresponding to groupKey:{currentGroupKey_beingProcessed}')
return countOf_alreadyProcessedGroups, lastChunk_index, dataframe_accumulator, currentGroupKey_beingProcessed
sorted_columns = ["sorted1","sorted2","sorted3"]
number_of_unique_groups = 3 #
lastChunk_index = None
for iteration_in_df in range(number_of_unique_groups):
dataframe_accumulator = pd.DataFrame()
countOf_alreadyProcessedGroups,lastChunk_index, group_data, currentGroupKey_Processed=\
accumulate_nextGroup(
iteration_in_df, lastChunk_index, dataframe_accumulator
)
run_logic(currentGroupKey_Processed, dataframe_accumulator)
print(f"end of iteration number {iteration_in_df+1} in the df and processed {currentGroupKey_Processed}")
print(group_data)
print("*"*50)
OUTPUT SOLUTION 2:
ITER:0 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:None
ITER:0 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:0
ITER:0 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:0 corresponding to groupKey:(1, 1, 1)
end of iteration number 1 in the df and processed (1, 1, 1)
sorted1 sorted2 sorted3 othe1 other2 other3 other4
0 1 1 1 'a' 'a' 'a' 'a'
1 1 1 1 'a' 'a' 'a' 'a'
2 1 1 1 'a' 'a' 'a' 'a'
3 1 1 1 'a' 'a' 'a' 'a'
**************************************************
ITER:1 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:1
ITER:1 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:1
ITER:1 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:1
Done with chunks for group of global index:1 corresponding to groupKey:(2, 1, 1)
end of iteration number 2 in the df and processed (2, 1, 1)
sorted1 sorted2 sorted3 othe1 other2 other3 other4
4 2 1 1 'a' 'a' 'a' 'a'
5 2 1 1 'd' 'd' 'd' 'd'
6 2 1 1 'd' 'd' 'd' 'a'
**************************************************
ITER:2 CHUNK:0 InfoPrevChunk:None lastProcessed_chunk:2
ITER:2 CHUNK:1 InfoPrevChunk:((0, (1, 1, 1)), (0, (1, 1, 1)), 1) lastProcessed_chunk:2
ITER:2 CHUNK:2 InfoPrevChunk:((0, (1, 1, 1)), (1, (2, 1, 1)), 2) lastProcessed_chunk:2
Done with chunks for group of global index:2 corresponding to groupKey:(3, 1, 1)
end of iteration number 3 in the df and processed (3, 1, 1)
sorted1 sorted2 sorted3 othe1 other2 other3 other4
7 3 1 1 'e' 'e' 'e' 'e'
8 3 1 1 'b' 'b' 'b' 'b'
**************************************************
Upvotes: 7