Reputation: 44381
(This question is related to this one and this one, but those are pre-walking the generator, which is exactly what I want to avoid)
I would like to split a generator in chunks. The requirements are:
I have tried the following code:
def head(iterable, max=10):
for cnt, el in enumerate(iterable):
yield el
if cnt >= max:
break
def chunks(iterable, size=10):
i = iter(iterable)
while True:
yield head(i, size)
# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)
for n, chunk in enumerate(chunks(els, 3)):
for el in chunk:
print 'Chunk %3d, value %d' % (n, el)
And this somehow works:
Chunk 0, value 0
Chunk 0, value 1
Chunk 0, value 2
Chunk 1, value 3
Chunk 1, value 4
Chunk 1, value 5
Chunk 2, value 6
^CTraceback (most recent call last):
File "xxxx.py", line 15, in <module>
for el in chunk:
File "xxxx.py", line 2, in head
for cnt, el in enumerate(iterable):
KeyboardInterrupt
Buuuut ... it never stops (I have to press ^C
) because of the while True
. I would like to stop that loop whenever the generator has been consumed, but I do not know how to detect that situation. I have tried raising an Exception:
class NoMoreData(Exception):
pass
def head(iterable, max=10):
for cnt, el in enumerate(iterable):
yield el
if cnt >= max:
break
if cnt == 0 : raise NoMoreData()
def chunks(iterable, size=10):
i = iter(iterable)
while True:
try:
yield head(i, size)
except NoMoreData:
break
# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)
for n, chunk in enumerate(chunks(els, 2)):
for el in chunk:
print 'Chunk %3d, value %d' % (n, el)
But then the exception is only raised in the context of the consumer, which is not what I want (I want to keep the consumer code clean)
Chunk 0, value 0
Chunk 0, value 1
Chunk 0, value 2
Chunk 1, value 3
Chunk 1, value 4
Chunk 1, value 5
Chunk 2, value 6
Traceback (most recent call last):
File "xxxx.py", line 22, in <module>
for el in chunk:
File "xxxx.py", line 9, in head
if cnt == 0 : raise NoMoreData
__main__.NoMoreData()
How can I detect that the generator is exhausted in the chunks
function, without walking it?
Upvotes: 84
Views: 35050
Reputation: 155438
To push the most possible work to the C layer (which is often the fastest solution in CPython), and still guarantee unconsumed chunks are discarded, a modified form of Moses' solution is the best option (groupby
does the work of cleaning up elements not consumed by the prior group):
from functools import partial
from itertools import cycle, groupby
from operator import itemgetter
def chunks(iterable, size=10):
c = cycle((False,) * size + (True,) * size) # Make a cheap iterator that will group in groups of size elements
# groupby will pass an element to next as its second argument, but because
# c is an infinite iterator, the second argument will never get used
return map(itemgetter(1), groupby(iterable, partial(next, c)))
If you never need to leave a group unconsumed, you can get a little faster with a modified form of tobiak_k's answer:
def chunks(iterable, size=10):
it = iter(iterable)
make_islice = partial(islice, it, size - 1)
for first in it:
yield chain((first,), make_islice())
But the incremental benefit to this is small (see timings) so I'd suggest only using it if you want the first unconsumed element from a group to become the first element of the next group; otherwise, stick to the groupby
-based solution that gives you better guarantees.
All that said, are you sure the groups need to be lazy? Because there are much better/simpler solutions if each chunk can be produced as a tuple
, consuming each chunk eagerly, but the input as a whole lazily. The best options by Python versions follow:
As of Python 3.12, there's a built-in for this, itertools.batched
. The arguments are the reverse of the chunker
recipes below, but it behaves the same way (batching into tuple
s of length n
, with the final batch potentially being incomplete):
from itertools import batched
for batch in batched('ABCDEFG', 3):
print(batch)
will output:
('A', 'B', 'C')
('D', 'E', 'F')
('G',)
The timings at the bottom of the answer show that it's by far the best solution if you can rely on 3.12+ and can produce the groups as tuple
s, rather than lazy iterators (spoiler: it takes 15-30% of the time of otherwise-equivalent lazy solutions, depending on whether the group is fully consumed or not). It's implemented at the C layer and the implementation takes advantage of performance optimizations not available at the Python layer, which lets it out-perform any solution implemented at the Python layer handily. In particular:
tuple
d islice
s solutions, it always preallocates a tuple
of size n
up-front for each batch and populates it directly (where tuple
ing islice
s involves building the tuple
up an element at a time, costing time on every batch; it uses amortized growth, but it can still involve a few reallocations per batch)zip_longest
solutions (which, thanks to zip_longest
's implementation, does do presized tuple
s, likely the source of its better performance in most cases):
.__next__
of the object once per batch, not n
times per batch (a small cost, given the C layer lookup, but it's paid for every batch), andfillvalue
, nor locate it after the fact for the final batch (which requires at least O(log n)
binary search, and pre-3.10, O(n)
linear search, plus O(n)
slicing work, for the final batch); batched
is counting the elements as it pulls them as a side-effect of tracking the index to insert them into within the tuple
, so when the iterator is exhausted, it stops immediately, and can directly realloc
the tuple
down to match the number of elements it was able to pull (avoiding a new tuple
, and often avoiding any copying at all).n
and/or large numbers of batches (and for 3.10-3.11, it's not meaningfully slower even in pathological cases):When the chunk size is typically small, the fastest solution is this one, adapted from rhettg's answer:
from itertools import takewhile, zip_longest
def chunker(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
If performance is critical, especially when chunk sizes are large and you're doing this a lot, and you can rely on 3.10+ (which is when bisect
added support for a key
argument), you can improve the above a bit by replacing O(n)
takewhile
with O(log n)
bisection, adding from bisect import bisect
to the imports, and changing:
yield tuple(takewhile(lambda v: v is not fillvalue, x))
to:
yield x[:bisect(x, False, key=lambda v: v is fillvalue)] # 3.10+ only!
zip_longest
-based solution above by a little in all but pathological cases [involving small numbers of huge batch sizes], losing by a roughly factor of 2x in common cases):By using purely C-level builtins (in CPython), no Python byte code is needed to produce each chunk (unless the underlying generator is implemented in Python) which has a huge performance benefit. It does walk each chunk before returning it, but it doesn't do any pre-walking beyond the chunk it's about to return:
# Only needed on Py2, to get iterator-based map; Py3's is already iterator-based
from future_builtins import map
from itertools import islice, repeat, starmap, takewhile
# operator.truth is *significantly* faster than bool for the case of
# exactly one positional argument prior to 3.10; in 3.10+, you can
# just use bool (which is trivially faster than truth)
from operator import truth
def chunker(n, iterable): # n is size of each chunk; last chunk may be smaller
return takewhile(truth, map(tuple, starmap(islice, repeat((iter(iterable), n)))))
Since that's a bit dense, the spread-out version for illustration:
def chunker(n, iterable):
iterable = iter(iterable)
while True:
x = tuple(islice(iterable, n))
if not x:
return
yield x
Wrapping a call to chunker
in enumerate
would let you number the chunks if it's needed.
>>> from itertools import *
>>> from functools import partial
>>> from operator import itemgetter
>>> def chunks_moses(iterable, size=10):
... c = count()
... for _, g in groupby(iterable, lambda _: next(c)//size):
... yield g
...
>>> def chunks_moses_optimized(iterable, size=10):
... c = cycle((False,) * size + (True,) * size)
... return map(itemgetter(1), groupby(iterable, partial(next, c)))
...
>>> %%timeit b = b'\0'*10000
... for grp in chunks_moses(b):
... next(grp) # Only consume one element, but rest of group skipped for you
...
1.35 ms ± 31.1 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %%timeit b = b'\0'*10000
... for grp in chunks_moses_optimized(b):
... next(grp) # Only consume one element, but rest of group skipped for you
469 μs ± 5.32 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
So pushing the same basic work to the C layer, and avoiding unnecessary Python level int
manipulation (creation of successive values, division) cuts the work by roughly a third.
These are equivalent in behavior to prior solutions only if all groups fully consumed before next group started, so we'll compare them to the prior solutions used in fully consumed mode:
# Relies on same imports as above
>>> def chunks_tobias(iterable, size=10):
... iterator = iter(iterable)
... for first in iterator:
... yield chain([first], islice(iterator, size - 1))
...
>>> def chunks_tobias_optimized(iterable, size=10):
... it = iter(iterable)
... make_islice = partial(islice, it, size - 1)
... for first in it:
... yield chain((first,), make_islice())
...
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_moses(b):
... consume(grp) # Consume all elements one at a time without storing them
...
1.26 ms ± 4.22 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_moses_optimized(b):
... consume(grp)
...
480 μs ± 11.2 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_tobias(b):
... consume(grp)
...
531 μs ± 15.4 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_tobias_optimized(b):
... consume(grp)
...
461 μs ± 24.3 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
So chunking with chain
and islice
is slightly faster than an optimized use of groupby
, at the expense of not properly handling partially consumed groups.
If you really can't store more than 1-2 elements in memory at once, these won't work, but usually that's not a real problem, so consider these options first:
>>> def chunks_rhettg_optimized(iterable, size=10):
... fillvalue = object()
... args = (iter(iterable),) * size
... for x in zip_longest(*args, fillvalue=fillvalue):
... if x[-1] is fillvalue:
... yield tuple(takewhile(lambda v: v is not fillvalue, x))
... else:
... yield x
...
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_rhettg_optimized(b):
... consume(grp)
...
195 μs ± 8.43 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in batched(b, 10):
... consume(grp)
...
142 μs ± 4.87 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
And just for completeness, timings for those solutions when you only examine a single element of the result:
>>> %%timeit b = b'\0'*10000
... for grp in chunks_rhettg_optimized(b):
... grp[0]
...
116 μs ± 712 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
>>> %%timeit b = b'\0'*10000
... for grp in batched(b, 10):
... grp[0]
...
75 μs ± 186 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
Summary of timing information:
tuple
for the group, rather than producing a lazy iterator over that group, it's much faster than any solution that produces group iterators. Producing tuple
s taking only ~30-45% of the time when using the entire result, and as little as ~15% of the time if you're only using a subset of each result (when comparing lazy group iterator solutions that only use part of each group, but still skip the unconsumed elements prior to the next group, to itertools.batched
only examining part of the group).groupby
+cycle
+partial
+itemgetter
+map
(which will ensure the last group is consumed before the next group is produced, even if the last group wasn't fully consumed by the user) is almost as fast as the optimized use of chain
+partial
+islice
, and guarantees an incompletely consumed group will still be consumed before the next group is produced (it does not guarantee the group is consumed if the next group isn't requested, but this is usually what you want). The only time I'd consider the trivially faster version that doesn't guarantee complete group consumption prior to the next group is when that's desired behavior (if you don't fully consume a group, you want the next group to begin with the first unconsumed element).Upvotes: 16
Reputation: 59604
How about using itertools.islice
:
import itertools
els = iter(xrange(7))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
Which gives:
[0, 1]
[2, 3]
[4, 5]
[6]
Chunker implementation with some tests:
import itertools
from typing import Iterable
def chunker(iterable: Iterable, size: int) -> Iterable[list]:
iterable = iter(iterable)
while True:
chunk = list(itertools.islice(iterable, size))
if not chunk:
break
yield chunk
assert list(chunker(range(10), 3)) == [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
assert list(chunker([], 3)) == []
Upvotes: 4
Reputation: 28606
Three variations of ShadowRanger's fastest pre-Python-3.12 solution using zip_longest
to get chunk tuples and removing the fillvalue from the last chunk. They're the Stefan_*
ones in this benchmark for an iterable with 10^6 elements and chunk size n=2 to n=1000:
Tested on Python 3.10.8. No idea what's up with Stefan_stopped
from n=7 to n=10, I ran the benchmark multiple times and it's always like that.
Stefan_hold
This one holds one chunk, and for each next chunk, yields the held chunk and then holds the next one instead. At the end, unfill and yield the final held chunk.
def chunker(n, iterable):
fillvalue = object()
args = repeat(iter(iterable), n)
chunks = zip_longest(*args, fillvalue=fillvalue)
for chunk in chunks:
for next_chunk in chunks:
yield chunk
chunk = next_chunk
yield unfill(chunk, fillvalue)
Stefan_stopped
ShadowRanger checks if x[-1] is fillvalue:
for each chunk x
. In this solution, I instead use the faster if stopped:
check of a bool value. To make that work, I chain an "empty" iterator to the input whose sole job is to set stopped
to True
:
def chunker(n, iterable):
stopped = False
def stop():
nonlocal stopped
stopped = True
return
yield
fillvalue = object()
it = iter(iterable)
most = repeat(it, n-1)
last = chain(it, stop())
for chunk in zip_longest(*most, last, fillvalue=fillvalue):
if stopped:
yield unfill(chunk, fillvalue)
else:
yield chunk
del chunk
Stefan_compress
This one eliminates Python interpretation during all but the last chunk. It uses tee
to get three copies of the chunks iterator from zip_longest
. The main
copy contributes all but the last chunk. The fast
copy is one chunk ahead, its sole job is to avoid the last chunk from main
. The slow
copy is one step behind fast
and provides the last chunk.
def chunker(n, iterable):
fillvalue = object()
args = (iter(iterable),) * n
chunks = zip_longest(*args, fillvalue=fillvalue)
main, fast, slow = tee(chunks, 3)
next(fast, None)
return chain(
compress(main, zip(fast, slow)),
(unfill(chunk, fillvalue) for chunk in slow)
)
unfill
The helper my solutions use to remove the fillvalue from the last chunk:
def unfill(chunk, fillvalue):
return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]
del
optimizationShadowRanger_del
is a minor modification of ShadowRanger's solution, deleting the chunk variable after yielding the chunk, so that zip_longest
can use its optimization of re-using its result tuple. My Stefan_stopped
also uses this optimization.
Though that only helps/works if the consumer of our chunks iterator also doesn't keep a reference to the tuple, for example if it uses map(sum, chunker(...))
to compute sums of chunks of numbers.
Here's the same benchmark but without that del
optimization:
Solutions, correctness checking, benchmarking, plotting.
import sys
print(sys.version)
import matplotlib.pyplot as plt
from itertools import takewhile, zip_longest, chain, compress, tee, repeat
from timeit import timeit
from statistics import mean, stdev
from collections import deque
import gc
from random import sample
from bisect import bisect
#----------------------------------------------------------------------
# Solutions
#----------------------------------------------------------------------
def ShadowRanger(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
def ShadowRanger_del(n, iterable):
'''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)'''
fillvalue = object() # Anonymous sentinel object that can't possibly appear in input
args = (iter(iterable),) * n
for x in zip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
# takewhile optimizes a bit for when n is large and the final
# group is small; at the cost of a little performance, you can
# avoid the takewhile import and simplify to:
# yield tuple(v for v in x if v is not fillvalue)
yield tuple(takewhile(lambda v: v is not fillvalue, x))
else:
yield x
del x
def unfill(chunk, fillvalue):
return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]
def Stefan_stopped(n, iterable):
stopped = False
def stop():
nonlocal stopped
stopped = True
return
yield
fillvalue = object()
it = iter(iterable)
most = repeat(it, n-1)
last = chain(it, stop())
for chunk in zip_longest(*most, last, fillvalue=fillvalue):
if stopped:
yield unfill(chunk, fillvalue)
else:
yield chunk
del chunk
def Stefan_compress(n, iterable):
fillvalue = object()
args = (iter(iterable),) * n
chunks = zip_longest(*args, fillvalue=fillvalue)
main, fast, slow = tee(chunks, 3)
next(fast, None)
return chain(
compress(main, zip(fast, slow)),
(unfill(chunk, fillvalue) for chunk in slow)
)
def Stefan_hold(n, iterable):
fillvalue = object()
args = repeat(iter(iterable), n)
chunks = zip_longest(*args, fillvalue=fillvalue)
for chunk in chunks:
for next_chunk in chunks:
yield chunk
chunk = next_chunk
yield unfill(chunk, fillvalue)
funcs = ShadowRanger, ShadowRanger_del, Stefan_stopped, Stefan_compress, Stefan_hold
#----------------------------------------------------------------------
# Correctness checks
#----------------------------------------------------------------------
def run(f):
return list(f(n, iter(range(N))))
for n in range(1, 10):
for N in range(100):
args = n, range(N)
expect = run(ShadowRanger)
for f in funcs:
result = run(f)
if result != expect:
raise Exception(f'{f.__name__} failed for {n=}, {N=}')
#----------------------------------------------------------------------
# Benchmarking
#----------------------------------------------------------------------
benchmarks = []
# Speed
N = 10 ** 6
for n in *range(2, 11), 20, 50, 100, 200, 500, 1000:
for _ in range(1):
times = {f: [] for f in funcs}
def stats(f):
ts = [t * 1e3 for t in sorted(times[f])[:10]]
return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} ms '
for _ in range(100):
for f in sample(funcs, len(funcs)):
gc.collect()
t = timeit(lambda: deque(f(n, repeat(None, N)), 0), number=1)
times[f].append(t)
print(f'\n{n = }')
for f in sorted(funcs, key=stats):
print(stats(f), f.__name__)
benchmarks.append((n, times))
#----------------------------------------------------------------------
# Plotting
#----------------------------------------------------------------------
names = [f.__name__ for f in funcs]
stats = [
(n, [mean(sorted(times[f])[:10]) * 1e3 for f in funcs])
for n, times in benchmarks
]
colors = {
'Stefan_stopped': 'blue',
'Stefan_compress': 'lime',
'Stefan_hold': 'gold',
'ShadowRanger': 'red',
'ShadowRanger_del': 'pink',
}
ns = [n for n, _ in stats]
print('%28s' % 'chunk size', *('%5d' % n for n in ns))
print('-' * 95)
x = range(len(ns))
for i, name in enumerate(names):
ts = [tss[i] for _, tss in stats]
ts = [tss[i] / tss[0] * 100 for _, tss in stats]
color = colors[name]
if color:
plt.plot(x, ts, '.-', color=color, label=name)
print('%29s' % name, *('%5.1f' % t for t in ts))
plt.xticks(x, ns, size=9)
plt.ylim(0, 120)
plt.title('chunker(n, $10^6$ elements)', weight='bold')
plt.xlabel('Chunk size n', weight='bold')
plt.ylabel("Time (for complete iteration) in % relative to ShadowRanger's", weight='bold')
plt.legend(loc='lower center')
#plt.show()
plt.savefig('chunker_plot.png', dpi=200)
Upvotes: 1
Reputation: 778
more-itertools has provided chunked and ichunked that can achieve the goal, it is mentioned on the Python 3 itertools document page.
Upvotes: 8
Reputation: 1726
Inspired by Moses Koledoye's answer, I tried to make a solution that uses itertools.groupby but doesn't require a divide at each step.
The following function can be used as a key to groupby, and it simply returns a boolean, which flips after a pre-defined number of calls.
def chunks(chunksize=3):
def flag_gen():
flag = False
while True:
for num in range(chunksize):
yield flag
flag = not flag
flag_iter = flag_gen()
def flag_func(*args, **kwargs):
return next(flag_iter)
return flag_func
Which can be used like this:
from itertools import groupby
my_long_generator = iter("abcdefghijklmnopqrstuvwxyz")
chunked_generator = groupby(my_long_generator, key=chunks(chunksize=5))
for flag, chunk in chunked_generator:
print("Flag is {f}".format(f=flag), list(chunk))
Output:
Flag is False ['a', 'b', 'c', 'd', 'e']
Flag is True ['f', 'g', 'h', 'i', 'j']
Flag is False ['k', 'l', 'm', 'n', 'o']
Flag is True ['p', 'q', 'r', 's', 't']
Flag is False ['u', 'v', 'w', 'x', 'y']
Flag is True ['z']
I've made a fiddle demonstrating this code.
Upvotes: 1
Reputation: 78554
Another way to create groups/chunks and not prewalk the generator is using itertools.groupby
on a key function that uses an itertools.count
object. Since the count
object is independent of the iterable, the chunks can be easily generated without any knowledge of what the iterable holds.
Every iteration of groupby
calls the next
method of the count
object and generates a group/chunk key (followed by items in the chunk) by doing an integer division of the current count value by the size of the chunk.
from itertools import groupby, count
def chunks(iterable, size=10):
c = count()
for _, g in groupby(iterable, lambda _: next(c)//size):
yield g
Each group/chunk g
yielded by the generator function is an iterator. However, since groupby
uses a shared iterator for all groups, the group iterators cannot be stored in a list or any container, each group iterator should be consumed before the next.
Upvotes: 17
Reputation: 697
Started to realize the usefulness of this scenario when crafting a solution to DB insertion of 500k+ rows at higher speed.
A generator processes the data from source and "yield"s it line by line; and then another generator groups the output in chunks and "yield "s it chunk by chunk. The second generator is only aware of the chunk size and nothing more.
Below is a sample to highlight the concept:
#!/usr/bin/python
def firstn_gen(n):
num = 0
while num < n:
yield num
num += 1
def chunk_gen(some_gen, chunk_size=7):
res_chunk = []
for count, item in enumerate(some_gen, 1):
res_chunk.append(item)
if count % chunk_size == 0:
yield res_chunk
res_chunk[:] = []
else:
yield res_chunk
if __name__ == '__main__':
for a_chunk in chunk_gen(firstn_gen(33)):
print(a_chunk)
Tested in Python 2.7.12:
[0, 1, 2, 3, 4, 5, 6]
[7, 8, 9, 10, 11, 12, 13]
[14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27]
[28, 29, 30, 31, 32]
Upvotes: 3
Reputation: 84
from itertools import islice
def chunk(it, n):
'''
# returns chunks of n elements each
>>> list(chunk(range(10), 3))
[
[0, 1, 2, ],
[3, 4, 5, ],
[6, 7, 8, ],
[9, ]
]
>>> list(chunk(list(range(10)), 3))
[
[0, 1, 2, ],
[3, 4, 5, ],
[6, 7, 8, ],
[9, ]
]
'''
def _w(g):
return lambda: tuple(islice(g, n))
return iter(_w(iter(it)), ())
Upvotes: 2
Reputation: 4605
I had this same issue, but found a simpler solution than those mentioned here:
def chunker(iterable, chunk_size):
els = iter(iterable)
while True:
next_el = next(els)
yield chain([next_el], islice(els, chunk_size - 1))
for i, chunk in enumerate(chunker(range(11), 2)):
for el in chunk:
print(i, el)
# Prints the following:
0 0
0 1
1 2
1 3
2 4
2 5
3 6
3 7
4 8
4 9
5 10
Upvotes: 2
Reputation: 82899
One way would be to peek at the first element, if any, and then create and return the actual generator.
def head(iterable, max=10):
first = next(iterable) # raise exception when depleted
def head_inner():
yield first # yield the extracted first element
for cnt, el in enumerate(iterable):
yield el
if cnt + 1 >= max: # cnt + 1 to include first
break
return head_inner()
Just use this in your chunk
generator and catch the StopIteration
exception like you did with your custom exception.
Update: Here's another version, using itertools.islice
to replace most of the head
function, and a for
loop. This simple for
loop in fact does exactly the same thing as that unwieldy while-try-next-except-break
construct in the original code, so the result is much more readable.
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator: # stops when iterator is depleted
def chunk(): # construct generator for next chunk
yield first # yield element from for loop
for more in islice(iterator, size - 1):
yield more # yield more elements from the iterator
yield chunk() # in outer generator, yield next chunk
And we can get even shorter than that, using itertools.chain
to replace the inner generator:
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, size - 1))
Upvotes: 99
Reputation: 148965
EDIT other solution with a generator of generators
You should not do a while True
in your iterator, but simply iterate through it and update the chunk number at each iteration :
def chunk(it, maxv):
n = 0
for i in it:
yield n // mavx, i
n += 1
If you want a generator of generators, you can have :
def chunk(a, maxv):
def inner(it, maxv, l):
l[0] = False
for i in range(maxv):
yield next(it)
l[0] = True
raise StopIteration
it = iter(a)
l = [True]
while l[0] == True:
yield inner(it, maxv, l)
raise StopIteration
with a being an iterable.
Tests : on python 2.7 and 3.4:
for i in chunk(range(7), 3):
print 'CHUNK'
for a in i:
print a
gives :
CHUNK
0
1
2
CHUNK
3
4
5
CHUNK
6
And on 2.7 :
for i in chunk(xrange(7), 3):
print 'CHUNK'
for a in i:
print a
gives same result.
But BEWARE : list(chunk(range(7))
blocks on 2.7 and 3.4
Upvotes: -1
Reputation:
You've said you don't wish to store things in memory, so does this mean that you can't build an intermediate list for the current chunk?
Why not traverse the generator and insert a sentinel value between chunks? The consumer (or a suitable wrapper) could ignore the sentinel:
class Sentinel(object):
pass
def chunk(els, size):
for i, el in enumerate(els):
yield el
if i > 0 and i % size == 0:
yield Sentinel
Upvotes: 0