Deathcrush
Deathcrush

Reputation: 77

Creating batches with itertools.islice

I am trying to use the itertools.combinations and itertools.slice functions in order to create a number of batches on which computations can be performed in parallel. I use the following function to create my batches:

def construct_batches(n,k,batch_size):

    combinations_slices = []

    # Calculate number of batches
    n_batches = math.ceil(comb(n,k,exact=True)/batch_size)

    # Construct iterator for combinations
    combinations = itertools.combinations(range(n),k)

    while len(combinations_slices) < n_batches:
        combinations_slices.append(itertools.islice(combinations,batch_size))

    return combinations_slices

After performing some computations, I find out which batches and elements are relevant. So I have a list of batches (e.g. batches = [2,3,1]) and a list of elements (e.g. elements = [5,7,0]). To my amazement/horror python has the following behaviour. Suppose I want to check if my slices are correct. Then

combinations_slices = construct_batches(n,k,batch_size)

list(combinations_slices[0])
Out[491]: 
[(0, 1, 2, 3),
 (0, 1, 2, 4),
 (0, 1, 2, 5),
 (0, 1, 2, 6),
 (0, 1, 2, 7),
 (0, 1, 2, 8),
 (0, 1, 2, 9),
 (0, 1, 3, 4),
 (0, 1, 3, 5),
 (0, 1, 3, 6)]

list(combinations_slices[1])
Out[492]: 
[(0, 1, 3, 7),
 (0, 1, 3, 8),
 (0, 1, 3, 9),
 (0, 1, 4, 5),
 (0, 1, 4, 6),
 (0, 1, 4, 7),
 (0, 1, 4, 8),
 (0, 1, 4, 9),
 (0, 1, 5, 6),
 (0, 1, 5, 7)]

This is all nice and jolly, shows the approach has worked. However, if I use a list comprehension to select the "relevant" batches as combinations_slices = [combinations_slices[i] for i in range(len(combinations_slices)) if i in batches], then the output, is (sadly):

combinations_slices = construct_batches(n,k,batch_size)

batches = [2,3,1]

combinations_slices = [combinations_slices[i] for i in range(len(combinations_slices)) if i in batches]

list(combinations_slices[0])
Out[509]: 
[(0, 1, 2, 3),
 (0, 1, 2, 4),
 (0, 1, 2, 5),
 (0, 1, 2, 6),
 (0, 1, 2, 7),
 (0, 1, 2, 8),
 (0, 1, 2, 9),
 (0, 1, 3, 4),
 (0, 1, 3, 5),
 (0, 1, 3, 6)]

list(combinations_slices[1])
Out[510]: 
[(0, 1, 3, 7),
 (0, 1, 3, 8),
 (0, 1, 3, 9),
 (0, 1, 4, 5),
 (0, 1, 4, 6),
 (0, 1, 4, 7),
 (0, 1, 4, 8),
 (0, 1, 4, 9),
 (0, 1, 5, 6),
 (0, 1, 5, 7)]

Is there any way to obtain the desired behaviour without casting everything to lists (in general these lists of combinations could be large so I would run out of memory...)? Suggestions appreciated...

Upvotes: 1

Views: 1694

Answers (1)

Kim
Kim

Reputation: 1664

I'm a little confused by your code and description, but here are some pointers.

There are couple of good batching tools in the more-itertools library. Take a look at chunk and ichunk in the grouping section. You'll need to pip install more-itertools to make them available to your code.

ichunked produces (lazy) islices and returns a generator, so it should not use much memory. But once you've read an islice or consumed the output of the generator, they're exhausted and can't be iterated over again.

from more_itertools import ichunked

numbers = range(27)
for batch in ichunked(numbers, 5):
    print(batch)
print(ichunked(numbers, 5))

Output (ichunked):

<itertools.islice object at 0x0000020BF63DD4F0>
<itertools.islice object at 0x0000020BF4519130>
<itertools.islice object at 0x0000020BF63DD4F0>
<itertools.islice object at 0x0000020BF4519130>
<itertools.islice object at 0x0000020BF63DD4F0>
<generator object ichunked at 0x00000261AFED7AC0>

chunked may be more useful if you need to analyse batches and then later submit them to your parallel computation. It will produce lists that you can read more than once, but as they're lists, they will use much more memory:

from more_itertools import chunked

numbers = range(27)
for batch in chunked(numbers, 5):
    print(batch)
print(chunked(numbers, 5))

Output (chunked):

[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17, 18, 19]
[20, 21, 22, 23, 24]
[25, 26]
<callable_iterator object at 0x000002163A25E370>

It also returns a callable iterator, which provides random access to elements. This is useful, but uses a lot of RAM. You could get around the large memory footprint of the chunked option by using ichunked and performing two passes over the same data. Something like:

def selected_batches()
    for batch_index, batchiter in enumerate(ichunked(numbers, 5)):
        if fulfils_criteria(batch):
            yield batch_index


def submit_to_parallel_computation():
    selected_batch_indexes = selected_batches()
    selected_batch_index = selected_batch_indexes.__next__()
    try:
        for batch_index, batchiter in enumerate(ichunked(numbers, 5)):
            if batch_index == selected_batch_index:
                add_to_parallel_work_queue(batchiter)
                # add_to_parallel_work_queue(list(batchiter))
                selected_batch_index = selected_batch_indexes.__next__()
    except StopIteration:
        # no more selected batches
        pass
    wait_for_results()

This is a good match if the analysis/chunking part of the computation must use as little memory as possible and is a much smaller part of the overall computational task than the parallel computation you're submitting these batches to. It also relies on the original data source itself being something you can iterate over more than once.

If this is not the case (so you can't re-use the input data) you might consider creating an intermediate file (e.g. using the csv standard library.)

Upvotes: 2

Related Questions