Trader001
Trader001

Reputation: 1004

Partition a Java 8 Stream

How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.

partition

public static <T> UnmodifiableIterator<List<T>> partition(Iterator<T> iterator,
                                                          int size)

Divides an iterator into unmodifiable sublists of the given size (the final list may be smaller). For example, partitioning an iterator containing [a, b, c, d, e] with a partition size of 3 yields [[a, b, c], [d, e]] -- an outer iterator containing two inner lists of three and two elements, all in the original order.

Upvotes: 77

Views: 65858

Answers (12)

M. Justin
M. Justin

Reputation: 21122

The Gatherers.windowFixed gatherer in the upcoming Java 24 release (available as a preview language feature since Java 22) partitions a stream into groups of a given size:

// [[0, 1, 2], [3, 4, 5], [6]]
Stream<List<Integer>> stream =
        Stream.of(0, 1, 2, 3, 4, 5, 6).gather(Gatherers.windowFixed(3));

This uses the new Stream.gather method with the new windowFixed gatherer to convert the list's stream Stream<Integer> to a Stream<List<Integer>>. This new feature was added to the language with the JEP 485: Stream Gatherers feature.


Your question indicated that you wanted a stream of streams, not a stream of lists. To do that, the standard existing Stream.map method can be applied to the previous result to convert the lists to streams:

Stream<Stream<Integer>> stream = Stream.of(0, 1, 2, 3, 4, 5, 6)
        .gather(Gatherers.windowFixed(3))
        .map(Collection::stream);

Javadocs

Gatherer:

An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. […]

[…]

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

Stream.gather:

Returns a stream consisting of the results of applying the given gatherer to the elements of this stream.

Gatherers.windowFixed

Returns a Gatherer that gathers elements into windows -- encounter-ordered groups of elements -- of a fixed size. If the stream is empty then no window will be produced. The last window may contain fewer elements than the supplied window size.

Example:

// will contain: [[1, 2, 3], [4, 5, 6], [7, 8]]
List<List<Integer>> windows =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();

Upvotes: 3

WarGoth
WarGoth

Reputation: 648

I found an elegant solution using Iterables.partition from Guava:

Iterable<List<T>> parts = Iterables.partition(stream::iterator, size)

Upvotes: 14

M. Justin
M. Justin

Reputation: 21122

This Guava-based solution will convert a Stream<Integer> into a partitioned Stream<Stream<Integer>>:

Stream<Integer> stream = Stream.of(0, 1, 2, 3, 4, 5, 6);

// [[0, 1, 2], [3, 4, 5], [6]]
Stream<Stream<Integer>> partitioned =
        Streams.stream(Iterators.partition(stream.iterator(), 3))
                .map(Collection::stream);

Walkthrough

  1. Start with a Stream<T>

    // [0, 1, 2, 3, 4, 5, 6]
    Stream<Integer> stream = Stream.of(0, 1, 2, 3, 4, 5, 6)
    
  2. Convert to Iterator<T> using Stream.iterator.

    // [0, 1, 2, 3, 4, 5, 6]
    Iterator<Integer> iterator = stream.iterator()
    
  3. Partition to a Iterator<List<T>> using Iterators.partition in Guava.

    // [[0, 1, 2], [3, 4, 5], [6]]
    Iterator<List<Integer>> iteratorOfLists = Iterators.partition(iterator, 3)
    
  4. Convert to a Stream<List<T>> using Streams.stream in Guava.

    // [[0, 1, 2], [3, 4, 5], [6]]
    Stream<List<Integer>> streamOfLists = Streams.stream(iteratorOfLists)
    
  5. Convert to a Stream<Stream<T>> using Stream.map.

    // Stream<Stream<T>> [[0, 1, 2], [3, 4, 5], [6]]
    Stream<List<Integer>> streamOfStreams = streamOfLists.map(Collection::stream)
    

Upvotes: 0

user_3380739
user_3380739

Reputation: 1254

Here is quick solution by abacus-common

IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));

Disclaimer:I'm the developer of abacus-common.

Upvotes: 0

maha.benjebara
maha.benjebara

Reputation: 242

This is a performant way

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;

public final class Partition<T> extends AbstractList<List<T>> {

private final List<T> list;
private final int chunkSize;

public Partition(List<T> list, int chunkSize) {
    this.list = new ArrayList<>(list);
    this.chunkSize = chunkSize;
}

public static <T> Partition<T> ofSize(List<T> list, int chunkSize) {
    return new Partition<>(list, chunkSize);
}

@Override
public List<T> get(int index) {
    int start = index * chunkSize;
    int end = Math.min(start + chunkSize, list.size());

    if (start > end) {
        throw new IndexOutOfBoundsException("Index " + index + " is out of the list range <0," + (size() - 1) + ">");
    }

    return new ArrayList<>(list.subList(start, end));
}

@Override
public int size() {
    return (int) Math.ceil((double) list.size() / (double) chunkSize);
}

}

Usage

Partition<String> partition = Partition.ofSize(paCustomerCodes, chunkSize);

for (List<String> strings : partition) {
}

Upvotes: 1

domax
domax

Reputation: 649

Here is a pure Java 8 solution - both sequential and parallel:

  public <T> Collection<List<T>> chunk(Collection<T> collection, int chunkSize) {
    final AtomicInteger index = new AtomicInteger();
    return collection.stream()
        .map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
        // LinkedHashMap is used here just to preserve order
        .collect(groupingBy(Entry::getKey, LinkedHashMap::new, mapping(Entry::getValue, toList())))
        .values();
  }

  public <T> Collection<List<T>> chunkParallel(Collection<T> collection, int chunkSize) {
    final AtomicInteger index = new AtomicInteger();
    return collection.parallelStream()
        .map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
        // So far it is parallel processing ordering cannot be preserved,
        // but we have to make it thread safe - using e.g. ConcurrentHashMap
        .collect(groupingBy(Entry::getKey, ConcurrentHashMap::new, mapping(Entry::getValue, toList())))
        .values();
  }

Upvotes: 0

Hei
Hei

Reputation: 61

This is a pure Java solution that's evaluated lazily instead of using List.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}

The method returns Stream<List<T>> for flexibility. You can convert it to Stream<Stream<T>> easily by partition(something, 10).map(List::stream).

Upvotes: 5

rloeffel
rloeffel

Reputation: 174

The most elegant and pure java 8 solution for this problem i found:

public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
                .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
                .collect(toList());
}

//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
    return (list.size() + batchSize- 1) / batchSize;
}

Upvotes: 2

John McClean
John McClean

Reputation: 5313

Provided you want to use the Stream sequentially, it is possible to partition a Stream (as well as perform related functions such as windowing - which I think is what you really want in this case). Two libraries that will support partitoning for standard Streams are cyclops-react (I am the author) and jOOλ which cyclops-react extends (to add functionality such as Windowing).

cyclops-streams has a collection of static functions StreamUtils for operating on Java Streams, and a series of functions such as splitAt, headAndTail, splitBy, partition for partitioning.

To window a Stream into a Stream of nested Streams of size 30 you can use the window method.

To the OPs point, in Streaming terms, splitting a Stream into multiple Streams of a given size is a Windowing operation (rather than a Partitioning operation).

  Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);

There is a Stream extension class called ReactiveSeq that extends jool.Seq and adds Windowing functionality, that may make the code a little cleaner.

  ReactiveSeq<Integer> seq;
  ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);

As Tagir points out above though, this isn't suitable for parallel Streams. If you want to window or batch a Stream you wish to executed in a multithreaded fashion. LazyFutureStream in cyclops-reactmight be useful (Windowing is on the to-do list, but plain old batching is available now).

In this case data will be passed from the multiple threads executing the Stream to a Multi-Producer/Single-Consumer wait-free Queue and the sequential data from that queue can be windowed before being distributed to threads again.

  Stream<List<Data>> batched = new LazyReact().range(0,1000)
                                              .grouped(30)
                                              .map(this::process);

Upvotes: 10

Tagir Valeev
Tagir Valeev

Reputation: 100209

It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);

Or if you really want the stream of streams:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);

If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
    if (length <= 0)
        throw new IllegalArgumentException("length = " + length);
    int size = source.size();
    if (size <= 0)
        return Stream.empty();
    int fullChunks = (size - 1) / length;
    return IntStream.range(0, fullChunks + 1).mapToObj(
        n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}

This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.


If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                   Collector<List<T>, A, R> downstream) {
    class Acc {
        List<T> cur = new ArrayList<>();
        A acc = downstream.supplier().get();
    }
    BiConsumer<Acc, T> accumulator = (acc, t) -> {
        acc.cur.add(t);
        if(acc.cur.size() == batchSize) {
            downstream.accumulator().accept(acc.acc, acc.cur);
            acc.cur = new ArrayList<>();
        }
    };
    return Collector.of(Acc::new, accumulator,
            (acc1, acc2) -> {
                acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                for(T t : acc2.cur) accumulator.accept(acc1, t);
                return acc1;
            }, acc -> {
                if(!acc.cur.isEmpty())
                    downstream.accumulator().accept(acc.acc, acc.cur);
                return downstream.finisher().apply(acc.acc);
            }, Collector.Characteristics.UNORDERED);
}

Usage example:

List<List<Integer>> list = IntStream.range(0,20)
                                    .boxed().parallel()
                                    .collect(unorderedBatches(3, Collectors.toList()));

Result:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]

Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

If you want to apply an intermediate transformation for every batch, you may use the following version:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
        Collector<T, AA, B> batchCollector,
        Collector<B, A, R> downstream) {
    return unorderedBatches(batchSize, 
            Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}

For example, this way you can sum the numbers in every batch on the fly:

List<Integer> list = IntStream.range(0,20)
        .boxed().parallel()
        .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
            Collectors.toList()));

Upvotes: 58

sibnick
sibnick

Reputation: 4305

I think it is possible with some sort of hack inside:

create utility class for batch:

public static class ConcurrentBatch {
    private AtomicLong id = new AtomicLong();
    private int batchSize;

    public ConcurrentBatch(int batchSize) {
        this.batchSize = batchSize;
    }

    public long next() {
        return (id.getAndIncrement()) / batchSize;
    }

    public int getBatchSize() {
        return batchSize;
    }
}

and method:

public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
    ConcurrentBatch batch = new ConcurrentBatch(batchSize);
    //hack java map: extends and override computeIfAbsent
    Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
        @Override
        public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
            List<T> rs = super.computeIfAbsent(key, mappingFunction);
            //apply batchFunc to old lists, when new batch list is created
            if(rs.isEmpty()){
                for(Entry<Long, List<T>> e : entrySet()) {
                    List<T> batchList = e.getValue();
                    //todo: need to improve
                    synchronized (batchList) {
                        if (batchList.size() == batch.getBatchSize()){
                            batchFunc.accept(batchList);
                            remove(e.getKey());
                            batchList.clear();
                        }
                    }
                }
            }
            return rs;
        }
    };
    stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
            .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
            .entrySet()
            .stream()
            //map contains only unprocessed lists (size<batchSize)
            .forEach(e -> batchFunc.accept(e.getValue()));
}

Upvotes: 1

Trader001
Trader001

Reputation: 1004

It seem like, as Jon Skeet has shown in his comment, it's not possible to make partitions lazy. For non-lazy partitions, I already have this code:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
    final Iterator<T> it = source.iterator();
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
    final Iterable<Stream<T>> iterable = () -> partIt;

    return StreamSupport.stream(iterable.spliterator(), false);
}

Upvotes: 6

Related Questions