Reputation: 77
I am trying to use the itertools.combinations
and itertools.slice
functions in order to create a number of batches on which computations can be performed in parallel. I use the following function to create my batches:
def construct_batches(n,k,batch_size):
combinations_slices = []
# Calculate number of batches
n_batches = math.ceil(comb(n,k,exact=True)/batch_size)
# Construct iterator for combinations
combinations = itertools.combinations(range(n),k)
while len(combinations_slices) < n_batches:
combinations_slices.append(itertools.islice(combinations,batch_size))
return combinations_slices
After performing some computations, I find out which batches and elements are relevant. So I have a list of batches (e.g. batches = [2,3,1]
) and a list of elements (e.g. elements = [5,7,0]
). To my amazement/horror python has the following behaviour. Suppose I want to check if my slices are correct. Then
combinations_slices = construct_batches(n,k,batch_size)
list(combinations_slices[0])
Out[491]:
[(0, 1, 2, 3),
(0, 1, 2, 4),
(0, 1, 2, 5),
(0, 1, 2, 6),
(0, 1, 2, 7),
(0, 1, 2, 8),
(0, 1, 2, 9),
(0, 1, 3, 4),
(0, 1, 3, 5),
(0, 1, 3, 6)]
list(combinations_slices[1])
Out[492]:
[(0, 1, 3, 7),
(0, 1, 3, 8),
(0, 1, 3, 9),
(0, 1, 4, 5),
(0, 1, 4, 6),
(0, 1, 4, 7),
(0, 1, 4, 8),
(0, 1, 4, 9),
(0, 1, 5, 6),
(0, 1, 5, 7)]
This is all nice and jolly, shows the approach has worked. However, if I use a list comprehension to select the "relevant" batches as combinations_slices = [combinations_slices[i] for i in range(len(combinations_slices)) if i in batches]
, then the output, is (sadly):
combinations_slices = construct_batches(n,k,batch_size)
batches = [2,3,1]
combinations_slices = [combinations_slices[i] for i in range(len(combinations_slices)) if i in batches]
list(combinations_slices[0])
Out[509]:
[(0, 1, 2, 3),
(0, 1, 2, 4),
(0, 1, 2, 5),
(0, 1, 2, 6),
(0, 1, 2, 7),
(0, 1, 2, 8),
(0, 1, 2, 9),
(0, 1, 3, 4),
(0, 1, 3, 5),
(0, 1, 3, 6)]
list(combinations_slices[1])
Out[510]:
[(0, 1, 3, 7),
(0, 1, 3, 8),
(0, 1, 3, 9),
(0, 1, 4, 5),
(0, 1, 4, 6),
(0, 1, 4, 7),
(0, 1, 4, 8),
(0, 1, 4, 9),
(0, 1, 5, 6),
(0, 1, 5, 7)]
Is there any way to obtain the desired behaviour without casting everything to lists (in general these lists of combinations could be large so I would run out of memory...)? Suggestions appreciated...
Upvotes: 1
Views: 1694
Reputation: 1664
I'm a little confused by your code and description, but here are some pointers.
There are couple of good batching tools in the more-itertools library. Take a look at chunk
and ichunk
in the grouping section. You'll need to pip install more-itertools
to make them available to your code.
ichunked
produces (lazy) islices and returns a generator, so it should not use much memory. But once you've read an islice or consumed the output of the generator, they're exhausted and can't be iterated over again.
from more_itertools import ichunked
numbers = range(27)
for batch in ichunked(numbers, 5):
print(batch)
print(ichunked(numbers, 5))
Output (ichunked):
<itertools.islice object at 0x0000020BF63DD4F0>
<itertools.islice object at 0x0000020BF4519130>
<itertools.islice object at 0x0000020BF63DD4F0>
<itertools.islice object at 0x0000020BF4519130>
<itertools.islice object at 0x0000020BF63DD4F0>
<generator object ichunked at 0x00000261AFED7AC0>
chunked
may be more useful if you need to analyse batches and then later submit them to your parallel computation. It will produce lists that you can read more than once, but as they're lists, they will use much more memory:
from more_itertools import chunked
numbers = range(27)
for batch in chunked(numbers, 5):
print(batch)
print(chunked(numbers, 5))
Output (chunked):
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17, 18, 19]
[20, 21, 22, 23, 24]
[25, 26]
<callable_iterator object at 0x000002163A25E370>
It also returns a callable iterator, which provides random access to elements. This is useful, but uses a lot of RAM. You could get around the large memory footprint of the chunked
option by using ichunked
and performing two passes over the same data. Something like:
def selected_batches()
for batch_index, batchiter in enumerate(ichunked(numbers, 5)):
if fulfils_criteria(batch):
yield batch_index
def submit_to_parallel_computation():
selected_batch_indexes = selected_batches()
selected_batch_index = selected_batch_indexes.__next__()
try:
for batch_index, batchiter in enumerate(ichunked(numbers, 5)):
if batch_index == selected_batch_index:
add_to_parallel_work_queue(batchiter)
# add_to_parallel_work_queue(list(batchiter))
selected_batch_index = selected_batch_indexes.__next__()
except StopIteration:
# no more selected batches
pass
wait_for_results()
This is a good match if the analysis/chunking part of the computation must use as little memory as possible and is a much smaller part of the overall computational task than the parallel computation you're submitting these batches to. It also relies on the original data source itself being something you can iterate over more than once.
If this is not the case (so you can't re-use the input data) you might consider creating an intermediate file (e.g. using the csv standard library.)
Upvotes: 2