Reputation: 1532
I have a large dataframe (several million rows).
I want to be able to do a groupby operation on it, but just grouping by arbitrary consecutive (preferably equal-sized) subsets of rows, rather than using any particular property of the individual rows to decide which group they go to.
The use case: I want to apply a function to each row via a parallel map in IPython. It doesn't matter which rows go to which back-end engine, as the function calculates a result based on one row at a time. (Conceptually at least; in reality it's vectorized.)
I've come up with something like this:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)
# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]
# Process chunks in parallel
results = dview.map_sync(my_function, groups)
But this seems very long-winded, and doesn't guarantee equal sized chunks. Especially if the index is sparse or non-integer or whatever.
Any suggestions for a better way?
Thanks!
Upvotes: 121
Views: 127041
Reputation: 511
A generator version of the chunk function is presented below. Moreover this version works with custom index of the pd.DataFrame or pd.Series (e.g. float type index)
import numpy as np
import pandas as pd
df_sz = 14
df = pd.DataFrame(
np.random.rand(df_sz,4),
index=np.linspace(0., 10., num=df_sz),
columns=['a', 'b', 'c', 'd'],
)
def chunker(seq, size):
for pos in range(0, len(seq), size):
yield seq.iloc[pos:pos + size]
chunk_size = 6
for i in chunker(df, chunk_size):
print(i)
chnk = chunker(df, chunk_size)
print('\n', chnk)
print(next(chnk))
print(next(chnk))
print(next(chnk))
The output is
a b c d 0.000000 0.560627 0.665897 0.683055 0.611884 0.769231 0.241871 0.357080 0.841945 0.340778 1.538462 0.065009 0.234621 0.250644 0.552410 2.307692 0.431394 0.235463 0.755084 0.114852 3.076923 0.173748 0.189739 0.148856 0.031171 3.846154 0.772352 0.697762 0.557806 0.254476 a b c d 4.615385 0.901200 0.977844 0.250316 0.957408 5.384615 0.400939 0.520841 0.863015 0.177043 6.153846 0.356927 0.344220 0.863067 0.400573 6.923077 0.375417 0.156420 0.897889 0.810083 7.692308 0.666371 0.152800 0.482446 0.955556 8.461538 0.242711 0.421591 0.005223 0.200596 a b c d 9.230769 0.735748 0.402639 0.527825 0.595952 10.000000 0.420209 0.365231 0.966829 0.514409 - generator object chunker at 0x7f503c9d0ba0 First "next()": a b c d 0.000000 0.560627 0.665897 0.683055 0.611884 0.769231 0.241871 0.357080 0.841945 0.340778 1.538462 0.065009 0.234621 0.250644 0.552410 2.307692 0.431394 0.235463 0.755084 0.114852 3.076923 0.173748 0.189739 0.148856 0.031171 3.846154 0.772352 0.697762 0.557806 0.254476 Second "next()": a b c d 4.615385 0.901200 0.977844 0.250316 0.957408 5.384615 0.400939 0.520841 0.863015 0.177043 6.153846 0.356927 0.344220 0.863067 0.400573 6.923077 0.375417 0.156420 0.897889 0.810083 7.692308 0.666371 0.152800 0.482446 0.955556 8.461538 0.242711 0.421591 0.005223 0.200596 Third "next()": a b c d 9.230769 0.735748 0.402639 0.527825 0.595952 10.000000 0.420209 0.365231 0.966829 0.514409
Upvotes: 15
Reputation: 13269
Use numpy's array_split():
import numpy as np
import pandas as pd
data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"
Upvotes: 160
Reputation: 20300
Another approach..
# .. load df ..
CHUNK_SIZE = 100000
for chunk_num in range(len(df) // CHUNK_SIZE + 1):
start_index = chunk_num*CHUNK_SIZE
end_index = min(chunk_num*CHUNK_SIZE + CHUNK_SIZE, len(df))
chunk = df[start_index:end_index]
# .. do calculaton on chunk here ..
Upvotes: 3
Reputation: 5635
Your suggestion to use groupby
is quite good, but you should rather use np.arange(len(dataframe)) // batch_size
than dataframe.index
, since the index can be non-integer and non-consequtive.
I've run some benchmarks on the answers given. The top-voted one is horribly slow. Please consider using the accepted solution:
data.groupby(np.arange(len(dataframe)) // batch_size)
Benchmark code:
import numpy as np
import pandas as pd
import time
from tqdm.auto import tqdm
#@markdown # Create a properly funcky `pd.DataFrame`
data = pd.DataFrame([
{
'x': np.random.randint(23515243),
'y': 364274*np.random.rand()-134562,
'z': ''.join(np.random.choice(list('`1234567890-=qwertyuiop[]\asdfghjkl;\'zxcvbnm,./~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:"ZXCVBNM<>?'), np.random.randint(54,89), replace=True)),
}
for _ in tqdm(range(22378))
])
data.index = ['a'] * len(data)
data = pd.concat([data] * 100)
batch_size = 64
times = []
t0 = time.time()
for chunk in np.array_split(data, (len(data) + batch_size - 1) // batch_size):
pass
times.append({'method': 'np.array_split', 'time': -t0 + time.time()})
t0 = time.time()
for _, chunk in data.groupby(np.arange(len(data)) // batch_size):
pass
times.append({'method': 'groupby', 'time': -t0 + time.time()})
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
t0 = time.time()
for chunk in chunker(data, batch_size):
pass
times.append({'method': '[]-syntax', 'time': -t0 + time.time()})
# t0 = time.time()
# for chunk in bz.odo(data, target=bz.chunks(pd.DataFrame), chunksize=batch_size):
# pass
# times.append({'method': 'bz.odo', 'time': -t0 + time.time()})
def chunker(seq, size):
for pos in range(0, len(seq), size):
yield seq.iloc[pos:pos + size]
t0 = time.time()
for i in chunker(data, batch_size):
pass
times.append({'method': '.iloc[]-syntax', 'time': -t0 + time.time()})
pd.DataFrame(times)
Upvotes: 3
Reputation: 551
import pandas as pd
def batch(iterable, batch_number=10):
"""
split an iterable into mini batch with batch length of batch_number
supports batch of a pandas dataframe
usage:
for i in batch([1,2,3,4,5], batch_number=2):
print(i)
for idx, mini_data in enumerate(batch(df, batch_number=10)):
print(idx)
print(mini_data)
"""
l = len(iterable)
for idx in range(0, l, batch_number):
if isinstance(iterable, pd.DataFrame):
# dataframe can't split index label, should iter according index
yield iterable.iloc[idx:min(idx+batch_number, l)]
else:
yield iterable[idx:min(idx+batch_number, l)]
Upvotes: 5
Reputation: 352979
In practice, you can't guarantee equal-sized chunks. The number of rows (N) might be prime, in which case you could only get equal-sized chunks at 1 or N. Because of this, real-world chunking typically uses a fixed size and allows for a smaller chunk at the end. I tend to pass an array to groupby
. Starting from:
>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 13 0.510273 0.051608 0.230402 0.756921
0 14 0.950544 0.576539 0.642602 0.907850
[15 rows x 5 columns]
where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:
>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
... print(k,g)
...
0 0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 8 0.241049 0.246149 0.241935 0.563428
0 9 0.493819 0.918858 0.193236 0.266257
[10 rows x 5 columns]
1 0 1 2 3 4
0 10 0.037693 0.370789 0.369117 0.401041
0 11 0.721843 0.862295 0.671733 0.605006
[...]
0 14 0.950544 0.576539 0.642602 0.907850
[5 rows x 5 columns]
Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b]
to ignore the index values and access data by position.
Upvotes: 60
Reputation: 1124
A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo
import blaze as bz
import pandas as pd
df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})
for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
# Do stuff with chunked dataframe
Upvotes: 13
Reputation: 1084
I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.
Here's a short example from that thread, which might do something like what you want:
import numpy as np
import pandas as pds
df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])
def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
for i in chunker(df,5):
print i
Which gives you something like this:
a b c d
0 0.860574 0.059326 0.339192 0.786399
1 0.029196 0.395613 0.524240 0.380265
2 0.235759 0.164282 0.350042 0.877004
3 0.545394 0.881960 0.994079 0.721279
4 0.584504 0.648308 0.655147 0.511390
a b c d
5 0.276160 0.982803 0.451825 0.845363
6 0.728453 0.246870 0.515770 0.343479
7 0.971947 0.278430 0.006910 0.888512
8 0.044888 0.875791 0.842361 0.890675
9 0.200563 0.246080 0.333202 0.574488
a b c d
10 0.971125 0.106790 0.274001 0.960579
11 0.722224 0.575325 0.465267 0.258976
12 0.574039 0.258625 0.469209 0.886768
13 0.915423 0.713076 0.073338 0.622967
I hope that helps.
EDIT
In this case, I used this function with pool of processors in (approximately) this manner:
from multiprocessing import Pool
nprocs = 4
pool = Pool(nprocs)
for chunk in chunker(df, nprocs):
data = pool.map(myfunction, chunk)
data.domorestuff()
I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.
Upvotes: 64