blueFast
blueFast

Reputation: 44381

Split a generator into chunks without pre-walking it

(This question is related to this one and this one, but those are pre-walking the generator, which is exactly what I want to avoid)

I would like to split a generator in chunks. The requirements are:

I have tried the following code:

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        yield head(i, size)

# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)

for n, chunk in enumerate(chunks(els, 3)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

And this somehow works:

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
^CTraceback (most recent call last):
  File "xxxx.py", line 15, in <module>
    for el in chunk:
  File "xxxx.py", line 2, in head
    for cnt, el in enumerate(iterable):
KeyboardInterrupt

Buuuut ... it never stops (I have to press ^C) because of the while True. I would like to stop that loop whenever the generator has been consumed, but I do not know how to detect that situation. I have tried raising an Exception:

class NoMoreData(Exception):
    pass

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break
    if cnt == 0 : raise NoMoreData()

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        try:
            yield head(i, size)
        except NoMoreData:
            break

# Sample generator: the real data is much more complex, and expensive to compute    
els = xrange(7)

for n, chunk in enumerate(chunks(els, 2)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

But then the exception is only raised in the context of the consumer, which is not what I want (I want to keep the consumer code clean)

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
Traceback (most recent call last):
  File "xxxx.py", line 22, in <module>
    for el in chunk:
  File "xxxx.py", line 9, in head
    if cnt == 0 : raise NoMoreData
__main__.NoMoreData()

How can I detect that the generator is exhausted in the chunks function, without walking it?

Upvotes: 84

Views: 35050

Answers (12)

ShadowRanger
ShadowRanger

Reputation: 155438

If you can't prewalk a chunk, you have to build this from parts, Python has nothing that will do it entirely for you

For timings, see bottom of answer.

To push the most possible work to the C layer (which is often the fastest solution in CPython), and still guarantee unconsumed chunks are discarded, a modified form of Moses' solution is the best option (groupby does the work of cleaning up elements not consumed by the prior group):

from functools import partial
from itertools import cycle, groupby
from operator import itemgetter

def chunks(iterable, size=10):
    c = cycle((False,) * size + (True,) * size)  # Make a cheap iterator that will group in groups of size elements
    # groupby will pass an element to next as its second argument, but because
    # c is an infinite iterator, the second argument will never get used
    return map(itemgetter(1), groupby(iterable, partial(next, c)))

If you never need to leave a group unconsumed, you can get a little faster with a modified form of tobiak_k's answer:

def chunks(iterable, size=10):
    it = iter(iterable)
    make_islice = partial(islice, it, size - 1)
    for first in it:
        yield chain((first,), make_islice())

But the incremental benefit to this is small (see timings) so I'd suggest only using it if you want the first unconsumed element from a group to become the first element of the next group; otherwise, stick to the groupby-based solution that gives you better guarantees.

All that said, are you sure the groups need to be lazy? Because there are much better/simpler solutions if each chunk can be produced as a tuple, consuming each chunk eagerly, but the input as a whole lazily. The best options by Python versions follow:

Best semi-eager solution going forward (requires Python 3.12 or higher, expected to release in October 2023), requiring no hand-rolled code at all, and being fastest in all scenarios if you can prewalk a chunk:

As of Python 3.12, there's a built-in for this, itertools.batched. The arguments are the reverse of the chunker recipes below, but it behaves the same way (batching into tuples of length n, with the final batch potentially being incomplete):

from itertools import batched

for batch in batched('ABCDEFG', 3):
    print(batch)

will output:

('A', 'B', 'C')
('D', 'E', 'F')
('G',)

The timings at the bottom of the answer show that it's by far the best solution if you can rely on 3.12+ and can produce the groups as tuples, rather than lazy iterators (spoiler: it takes 15-30% of the time of otherwise-equivalent lazy solutions, depending on whether the group is fully consumed or not). It's implemented at the C layer and the implementation takes advantage of performance optimizations not available at the Python layer, which lets it out-perform any solution implemented at the Python layer handily. In particular:

  • Unlike tupled islices solutions, it always preallocates a tuple of size n up-front for each batch and populates it directly (where tupleing islices involves building the tuple up an element at a time, costing time on every batch; it uses amortized growth, but it can still involve a few reallocations per batch)
  • Unlike the zip_longest solutions (which, thanks to zip_longest's implementation, does do presized tuples, likely the source of its better performance in most cases):
    1. (Every batch waste) It only has to look up the .__next__ of the object once per batch, not n times per batch (a small cost, given the C layer lookup, but it's paid for every batch), and
    2. (One-time waste tied to size of final incomplete batch) It doesn't need to create a sentinel for the fillvalue, nor locate it after the fact for the final batch (which requires at least O(log n) binary search, and pre-3.10, O(n) linear search, plus O(n) slicing work, for the final batch); batched is counting the elements as it pulls them as a side-effect of tracking the index to insert them into within the tuple, so when the iterator is exhausted, it stops immediately, and can directly realloc the tuple down to match the number of elements it was able to pull (avoiding a new tuple, and often avoiding any copying at all).

Fastest available solution prior to 3.12 for small-medium n and/or large numbers of batches (and for 3.10-3.11, it's not meaningfully slower even in pathological cases):

When the chunk size is typically small, the fastest solution is this one, adapted from rhettg's answer:

from itertools import takewhile, zip_longest

def chunker(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    fillvalue = object()  # Anonymous sentinel object that can't possibly appear in input
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x

If performance is critical, especially when chunk sizes are large and you're doing this a lot, and you can rely on 3.10+ (which is when bisect added support for a key argument), you can improve the above a bit by replacing O(n) takewhile with O(log n) bisection, adding from bisect import bisect to the imports, and changing:

yield tuple(takewhile(lambda v: v is not fillvalue, x))

to:

yield x[:bisect(x, False, key=lambda v: v is fillvalue)]  # 3.10+ only!

Older answer (still very fast, thanks to pushing all the work to the C layer, but loses to zip_longest-based solution above by a little in all but pathological cases [involving small numbers of huge batch sizes], losing by a roughly factor of 2x in common cases):

By using purely C-level builtins (in CPython), no Python byte code is needed to produce each chunk (unless the underlying generator is implemented in Python) which has a huge performance benefit. It does walk each chunk before returning it, but it doesn't do any pre-walking beyond the chunk it's about to return:

# Only needed on Py2, to get iterator-based map; Py3's is already iterator-based
from future_builtins import map

from itertools import islice, repeat, starmap, takewhile

# operator.truth is *significantly* faster than bool for the case of
# exactly one positional argument prior to 3.10; in 3.10+, you can
# just use bool (which is trivially faster than truth)
from operator import truth

def chunker(n, iterable):  # n is size of each chunk; last chunk may be smaller
    return takewhile(truth, map(tuple, starmap(islice, repeat((iter(iterable), n)))))

Since that's a bit dense, the spread-out version for illustration:

def chunker(n, iterable):
    iterable = iter(iterable)
    while True:
        x = tuple(islice(iterable, n))
        if not x:
            return
        yield x

Wrapping a call to chunker in enumerate would let you number the chunks if it's needed.

Timings (on CPython 3.12.0, on RHEL8)

Solutions that lazily consume the group and ensure the alignment of groups is maintained, even if a group is not fully consumed:
>>> from itertools import *
>>> from functools import partial
>>> from operator import itemgetter
>>> def chunks_moses(iterable, size=10):
...     c = count()
...     for _, g in groupby(iterable, lambda _: next(c)//size):
...         yield g
...
>>> def chunks_moses_optimized(iterable, size=10):
...    c = cycle((False,) * size + (True,) * size)
...    return map(itemgetter(1), groupby(iterable, partial(next, c)))
...
>>> %%timeit b = b'\0'*10000
... for grp in chunks_moses(b):
...     next(grp)  # Only consume one element, but rest of group skipped for you
...
1.35 ms ± 31.1 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

>>> %%timeit b = b'\0'*10000
... for grp in chunks_moses_optimized(b):
...     next(grp)  # Only consume one element, but rest of group skipped for you
469 μs ± 5.32 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

So pushing the same basic work to the C layer, and avoiding unnecessary Python level int manipulation (creation of successive values, division) cuts the work by roughly a third.

Solutions that lazily consume the group, and don't consume unprocessed elements

These are equivalent in behavior to prior solutions only if all groups fully consumed before next group started, so we'll compare them to the prior solutions used in fully consumed mode:

# Relies on same imports as above
>>> def chunks_tobias(iterable, size=10):
...     iterator = iter(iterable)
...     for first in iterator:
...         yield chain([first], islice(iterator, size - 1))
... 
>>> def chunks_tobias_optimized(iterable, size=10):
...     it = iter(iterable)
...     make_islice = partial(islice, it, size - 1)
...     for first in it:
...         yield chain((first,), make_islice())
... 
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_moses(b):
...     consume(grp)  # Consume all elements one at a time without storing them
...
1.26 ms ± 4.22 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_moses_optimized(b):
...     consume(grp)
...
480 μs ± 11.2 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_tobias(b):
...     consume(grp)
...
531 μs ± 15.4 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_tobias_optimized(b):
...     consume(grp)
...
461 μs ± 24.3 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

So chunking with chain and islice is slightly faster than an optimized use of groupby, at the expense of not properly handling partially consumed groups.

Solutions that eagerly complete each complete group before returning it (but still lazily process the iterator as a whole)

If you really can't store more than 1-2 elements in memory at once, these won't work, but usually that's not a real problem, so consider these options first:

>>> def chunks_rhettg_optimized(iterable, size=10):
...     fillvalue = object()
...     args = (iter(iterable),) * size
...     for x in zip_longest(*args, fillvalue=fillvalue):
...         if x[-1] is fillvalue:
...             yield tuple(takewhile(lambda v: v is not fillvalue, x))
...         else:
...             yield x
...
>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in chunks_rhettg_optimized(b):
...     consume(grp)
...
195 μs ± 8.43 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

>>> %%timeit b = b'\0'*10000; from collections import deque; consume = deque(maxlen=0).extend
... for grp in batched(b, 10):
...     consume(grp)
...
142 μs ± 4.87 μs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

And just for completeness, timings for those solutions when you only examine a single element of the result:

>>> %%timeit b = b'\0'*10000
... for grp in chunks_rhettg_optimized(b):
...     grp[0]
...
116 μs ± 712 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

>>> %%timeit b = b'\0'*10000
... for grp in batched(b, 10):
...     grp[0]
...
75 μs ± 186 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Summary of timing information:

  1. If you can afford the memory/resources to use a solution that eagerly produces a tuple for the group, rather than producing a lazy iterator over that group, it's much faster than any solution that produces group iterators. Producing tuples taking only ~30-45% of the time when using the entire result, and as little as ~15% of the time if you're only using a subset of each result (when comparing lazy group iterator solutions that only use part of each group, but still skip the unconsumed elements prior to the next group, to itertools.batched only examining part of the group).
  2. If you must use a lazy group iterator solution, an optimized use of groupby+cycle+partial+itemgetter+map (which will ensure the last group is consumed before the next group is produced, even if the last group wasn't fully consumed by the user) is almost as fast as the optimized use of chain+partial+islice, and guarantees an incompletely consumed group will still be consumed before the next group is produced (it does not guarantee the group is consumed if the next group isn't requested, but this is usually what you want). The only time I'd consider the trivially faster version that doesn't guarantee complete group consumption prior to the next group is when that's desired behavior (if you don't fully consume a group, you want the next group to begin with the first unconsumed element).

Upvotes: 16

warvariuc
warvariuc

Reputation: 59604

How about using itertools.islice:

import itertools

els = iter(xrange(7))

print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))

Which gives:

[0, 1]
[2, 3]
[4, 5]
[6]

Chunker implementation with some tests:

import itertools
from typing import Iterable


def chunker(iterable: Iterable, size: int) -> Iterable[list]:
    iterable = iter(iterable)
    while True:
        chunk = list(itertools.islice(iterable, size))
        if not chunk:
            break
        yield chunk
    

assert list(chunker(range(10), 3)) == [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
assert list(chunker([], 3)) == []

Upvotes: 4

Stefan Pochmann
Stefan Pochmann

Reputation: 28606

Three variations of ShadowRanger's fastest pre-Python-3.12 solution using zip_longest to get chunk tuples and removing the fillvalue from the last chunk. They're the Stefan_* ones in this benchmark for an iterable with 10^6 elements and chunk size n=2 to n=1000:

benchmark plot

Tested on Python 3.10.8. No idea what's up with Stefan_stopped from n=7 to n=10, I ran the benchmark multiple times and it's always like that.

Stefan_hold

This one holds one chunk, and for each next chunk, yields the held chunk and then holds the next one instead. At the end, unfill and yield the final held chunk.

def chunker(n, iterable):
    fillvalue = object()
    args = repeat(iter(iterable), n)
    chunks = zip_longest(*args, fillvalue=fillvalue)
    for chunk in chunks:
        for next_chunk in chunks:
            yield chunk
            chunk = next_chunk
        yield unfill(chunk, fillvalue)

Stefan_stopped

ShadowRanger checks if x[-1] is fillvalue: for each chunk x. In this solution, I instead use the faster if stopped: check of a bool value. To make that work, I chain an "empty" iterator to the input whose sole job is to set stopped to True:

def chunker(n, iterable):
    stopped = False
    def stop():
        nonlocal stopped
        stopped = True
        return
        yield
    fillvalue = object()
    it = iter(iterable)
    most = repeat(it, n-1)
    last = chain(it, stop())
    for chunk in zip_longest(*most, last, fillvalue=fillvalue):
        if stopped:
            yield unfill(chunk, fillvalue)
        else:
            yield chunk
            del chunk

Stefan_compress

This one eliminates Python interpretation during all but the last chunk. It uses tee to get three copies of the chunks iterator from zip_longest. The main copy contributes all but the last chunk. The fast copy is one chunk ahead, its sole job is to avoid the last chunk from main. The slow copy is one step behind fast and provides the last chunk.

def chunker(n, iterable):
    fillvalue = object()
    args = (iter(iterable),) * n
    chunks = zip_longest(*args, fillvalue=fillvalue)
    main, fast, slow = tee(chunks, 3)
    next(fast, None)
    return chain(
        compress(main, zip(fast, slow)),
        (unfill(chunk, fillvalue) for chunk in slow)
    )

unfill

The helper my solutions use to remove the fillvalue from the last chunk:

def unfill(chunk, fillvalue):
    return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]

The del optimization

ShadowRanger_del is a minor modification of ShadowRanger's solution, deleting the chunk variable after yielding the chunk, so that zip_longest can use its optimization of re-using its result tuple. My Stefan_stopped also uses this optimization.

Though that only helps/works if the consumer of our chunks iterator also doesn't keep a reference to the tuple, for example if it uses map(sum, chunker(...)) to compute sums of chunks of numbers.

Here's the same benchmark but without that del optimization:

benchmark plot without del optimization

Full code

Solutions, correctness checking, benchmarking, plotting.

import sys
print(sys.version)
import matplotlib.pyplot as plt
from itertools import takewhile, zip_longest, chain, compress, tee, repeat
from timeit import timeit
from statistics import mean, stdev
from collections import deque
import gc
from random import sample
from bisect import bisect


#----------------------------------------------------------------------
# Solutions
#----------------------------------------------------------------------

def ShadowRanger(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    fillvalue = object()  # Anonymous sentinel object that can't possibly appear in input
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x


def ShadowRanger_del(n, iterable):
    '''chunker(3, 'ABCDEFG') --> ('A', 'B', 'C'), ('D', 'E', 'F'),  ('G',)'''
    fillvalue = object()  # Anonymous sentinel object that can't possibly appear in input
    args = (iter(iterable),) * n
    for x in zip_longest(*args, fillvalue=fillvalue):
        if x[-1] is fillvalue:
            # takewhile optimizes a bit for when n is large and the final
            # group is small; at the cost of a little performance, you can
            # avoid the takewhile import and simplify to:
            # yield tuple(v for v in x if v is not fillvalue)
            yield tuple(takewhile(lambda v: v is not fillvalue, x))
        else:
            yield x
            del x


def unfill(chunk, fillvalue):
    return chunk[:bisect(chunk, False, key=lambda value: value is fillvalue)]


def Stefan_stopped(n, iterable):
    stopped = False
    def stop():
        nonlocal stopped
        stopped = True
        return
        yield
    fillvalue = object()
    it = iter(iterable)
    most = repeat(it, n-1)
    last = chain(it, stop())
    for chunk in zip_longest(*most, last, fillvalue=fillvalue):
        if stopped:
            yield unfill(chunk, fillvalue)
        else:
            yield chunk
            del chunk


def Stefan_compress(n, iterable):
    fillvalue = object()
    args = (iter(iterable),) * n
    chunks = zip_longest(*args, fillvalue=fillvalue)
    main, fast, slow = tee(chunks, 3)
    next(fast, None)
    return chain(
        compress(main, zip(fast, slow)),
        (unfill(chunk, fillvalue) for chunk in slow)
    )


def Stefan_hold(n, iterable):
    fillvalue = object()
    args = repeat(iter(iterable), n)
    chunks = zip_longest(*args, fillvalue=fillvalue)
    for chunk in chunks:
        for next_chunk in chunks:
            yield chunk
            chunk = next_chunk
        yield unfill(chunk, fillvalue)


funcs = ShadowRanger, ShadowRanger_del, Stefan_stopped, Stefan_compress, Stefan_hold


#----------------------------------------------------------------------
# Correctness checks
#----------------------------------------------------------------------

def run(f):
    return list(f(n, iter(range(N))))
for n in range(1, 10):
    for N in range(100):
        args = n, range(N)
        expect = run(ShadowRanger)
        for f in funcs:
            result = run(f)
            if result != expect:
                raise Exception(f'{f.__name__} failed for {n=}, {N=}')


#----------------------------------------------------------------------
# Benchmarking
#----------------------------------------------------------------------

benchmarks = []

# Speed
N = 10 ** 6
for n in *range(2, 11), 20, 50, 100, 200, 500, 1000:
  for _ in range(1):
    times = {f: [] for f in funcs}
    def stats(f):
        ts = [t * 1e3 for t in sorted(times[f])[:10]]
        return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} ms '

    for _ in range(100):
        for f in sample(funcs, len(funcs)):
            gc.collect()
            t = timeit(lambda: deque(f(n, repeat(None, N)), 0), number=1)
            times[f].append(t)

    print(f'\n{n = }')
    for f in sorted(funcs, key=stats):
        print(stats(f), f.__name__)

    benchmarks.append((n, times))


#----------------------------------------------------------------------
# Plotting
#----------------------------------------------------------------------

names = [f.__name__ for f in funcs]
stats = [
    (n, [mean(sorted(times[f])[:10]) * 1e3 for f in funcs])
    for n, times in benchmarks
]

colors = {
    'Stefan_stopped': 'blue',
    'Stefan_compress': 'lime',
    'Stefan_hold': 'gold',
    'ShadowRanger': 'red',
    'ShadowRanger_del': 'pink',
}
ns = [n for n, _ in stats]
print('%28s' % 'chunk size', *('%5d' % n for n in ns))
print('-' * 95)
x = range(len(ns))
for i, name in enumerate(names):
    ts = [tss[i] for _, tss in stats]
    ts = [tss[i] / tss[0] * 100 for _, tss in stats]
    color = colors[name]
    if color:
        plt.plot(x, ts, '.-', color=color, label=name)
        print('%29s' % name, *('%5.1f' % t for t in ts))
plt.xticks(x, ns, size=9)
plt.ylim(0, 120)
plt.title('chunker(n, $10^6$ elements)', weight='bold')
plt.xlabel('Chunk size n', weight='bold')
plt.ylabel("Time (for complete iteration) in % relative to ShadowRanger's", weight='bold')
plt.legend(loc='lower center')
#plt.show()
plt.savefig('chunker_plot.png', dpi=200)

Upvotes: 1

duyue
duyue

Reputation: 778

more-itertools has provided chunked and ichunked that can achieve the goal, it is mentioned on the Python 3 itertools document page.

chunked and ichunked example

Upvotes: 8

vowel-house-might
vowel-house-might

Reputation: 1726

Inspired by Moses Koledoye's answer, I tried to make a solution that uses itertools.groupby but doesn't require a divide at each step.

The following function can be used as a key to groupby, and it simply returns a boolean, which flips after a pre-defined number of calls.

def chunks(chunksize=3):

    def flag_gen():
        flag = False
        while True:
            for num in range(chunksize):
                yield flag
            flag = not flag

    flag_iter = flag_gen()

    def flag_func(*args, **kwargs):
        return next(flag_iter)

    return flag_func

Which can be used like this:

from itertools import groupby

my_long_generator = iter("abcdefghijklmnopqrstuvwxyz")

chunked_generator = groupby(my_long_generator, key=chunks(chunksize=5))

for flag, chunk in chunked_generator:
    print("Flag is {f}".format(f=flag), list(chunk))

Output:

Flag is False ['a', 'b', 'c', 'd', 'e']
Flag is True ['f', 'g', 'h', 'i', 'j']
Flag is False ['k', 'l', 'm', 'n', 'o']
Flag is True ['p', 'q', 'r', 's', 't']
Flag is False ['u', 'v', 'w', 'x', 'y']
Flag is True ['z']

I've made a fiddle demonstrating this code.

Upvotes: 1

Moses Koledoye
Moses Koledoye

Reputation: 78554

Another way to create groups/chunks and not prewalk the generator is using itertools.groupby on a key function that uses an itertools.count object. Since the count object is independent of the iterable, the chunks can be easily generated without any knowledge of what the iterable holds.

Every iteration of groupby calls the next method of the count object and generates a group/chunk key (followed by items in the chunk) by doing an integer division of the current count value by the size of the chunk.

from itertools import groupby, count

def chunks(iterable, size=10):
    c = count()
    for _, g in groupby(iterable, lambda _: next(c)//size):
        yield g

Each group/chunk g yielded by the generator function is an iterator. However, since groupby uses a shared iterator for all groups, the group iterators cannot be stored in a list or any container, each group iterator should be consumed before the next.

Upvotes: 17

Down the Stream
Down the Stream

Reputation: 697

Started to realize the usefulness of this scenario when crafting a solution to DB insertion of 500k+ rows at higher speed.

A generator processes the data from source and "yield"s it line by line; and then another generator groups the output in chunks and "yield "s it chunk by chunk. The second generator is only aware of the chunk size and nothing more.

Below is a sample to highlight the concept:

#!/usr/bin/python

def firstn_gen(n):
    num = 0
    while num < n:
        yield num
        num += 1

def chunk_gen(some_gen, chunk_size=7):
    res_chunk = []
    for count, item in enumerate(some_gen, 1):
        res_chunk.append(item)
        if count % chunk_size == 0:
            yield res_chunk
            res_chunk[:] = []
    else:
        yield res_chunk


if __name__ == '__main__':
    for a_chunk in chunk_gen(firstn_gen(33)):
        print(a_chunk)

Tested in Python 2.7.12:

[0, 1, 2, 3, 4, 5, 6]
[7, 8, 9, 10, 11, 12, 13]
[14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27]
[28, 29, 30, 31, 32]

Upvotes: 3

igiroux
igiroux

Reputation: 84

from itertools import islice
def chunk(it, n):
    '''
    # returns chunks of n elements each

    >>> list(chunk(range(10), 3))
    [
        [0, 1, 2, ],
        [3, 4, 5, ],
        [6, 7, 8, ],
        [9, ]
    ]

    >>> list(chunk(list(range(10)), 3))
    [
        [0, 1, 2, ],
        [3, 4, 5, ],
        [6, 7, 8, ],
        [9, ]
    ]
    '''
    def _w(g):
        return lambda: tuple(islice(g, n))
    return iter(_w(iter(it)), ())

Upvotes: 2

santon
santon

Reputation: 4605

I had this same issue, but found a simpler solution than those mentioned here:

def chunker(iterable, chunk_size):
    els = iter(iterable)
    while True:
        next_el = next(els)
        yield chain([next_el], islice(els, chunk_size - 1))

for i, chunk in enumerate(chunker(range(11), 2)):
    for el in chunk:
        print(i, el)

# Prints the following:
0 0
0 1
1 2
1 3
2 4
2 5
3 6
3 7
4 8
4 9
5 10

Upvotes: 2

tobias_k
tobias_k

Reputation: 82899

One way would be to peek at the first element, if any, and then create and return the actual generator.

def head(iterable, max=10):
    first = next(iterable)      # raise exception when depleted
    def head_inner():
        yield first             # yield the extracted first element
        for cnt, el in enumerate(iterable):
            yield el
            if cnt + 1 >= max:  # cnt + 1 to include first
                break
    return head_inner()

Just use this in your chunk generator and catch the StopIteration exception like you did with your custom exception.


Update: Here's another version, using itertools.islice to replace most of the head function, and a for loop. This simple for loop in fact does exactly the same thing as that unwieldy while-try-next-except-break construct in the original code, so the result is much more readable.

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:    # stops when iterator is depleted
        def chunk():          # construct generator for next chunk
            yield first       # yield element from for loop
            for more in islice(iterator, size - 1):
                yield more    # yield more elements from the iterator
        yield chunk()         # in outer generator, yield next chunk

And we can get even shorter than that, using itertools.chain to replace the inner generator:

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:
        yield chain([first], islice(iterator, size - 1))

Upvotes: 99

Serge Ballesta
Serge Ballesta

Reputation: 148965

EDIT other solution with a generator of generators

You should not do a while True in your iterator, but simply iterate through it and update the chunk number at each iteration :

def chunk(it, maxv):
    n = 0
    for i in it:
        yield n // mavx, i
        n += 1

If you want a generator of generators, you can have :

def chunk(a, maxv):
    def inner(it, maxv, l):
        l[0] = False
        for i in range(maxv):
            yield next(it)
        l[0] = True
        raise StopIteration
    it = iter(a)
    l = [True]
    while l[0] == True:
        yield inner(it, maxv, l)
    raise StopIteration

with a being an iterable.

Tests : on python 2.7 and 3.4:

for i in chunk(range(7), 3):
    print 'CHUNK'
    for a in i:
        print a

gives :

CHUNK
0
1
2
CHUNK
3
4
5
CHUNK
6

And on 2.7 :

for i in chunk(xrange(7), 3):
    print 'CHUNK'
    for a in i:
        print a

gives same result.

But BEWARE : list(chunk(range(7)) blocks on 2.7 and 3.4

Upvotes: -1

user1961503
user1961503

Reputation:

You've said you don't wish to store things in memory, so does this mean that you can't build an intermediate list for the current chunk?

Why not traverse the generator and insert a sentinel value between chunks? The consumer (or a suitable wrapper) could ignore the sentinel:

class Sentinel(object):
    pass

def chunk(els, size):
    for i, el in enumerate(els):
        yield el
        if i > 0 and i % size == 0:
            yield Sentinel

Upvotes: 0

Related Questions