Eyal Schneider
Eyal Schneider

Reputation: 22446

Java Streams - grouping items on sorted streams efficiently

I am looking for a way to implement a non-terminal grouping operation, such that the memory overhead will be minimal.

For example, consider distinct(). In the general case, it has no choice but to collect all distinct items, and only then stream them forward. However, if we know that the input stream is already sorted, the operation could be done "on-the-fly", using minimal memory.

I know I can achieve this for iterators using an iterator wrapper and implementing the grouping logic myself. Is there a simpler way to implement this using streams API instead?

--EDIT--

I found a way to abuse Stream.flatMap(..) to achieve this:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }

And then:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);

Which prints:

1
3
4
5

With some changes, the same technique can be used for any kind of memory-efficient sequence grouping of streams. Anyway, I don't like much this solution, and I was looking for something more natural (like the way mapping or filtering work for example). Furthermore, I'm breaking the contract here because the function supplied to flatMap(..) is stateful.

Upvotes: 8

Views: 1056

Answers (2)

Holger
Holger

Reputation: 298123

If you want a solution that doesn’t add mutable state to a function that isn’t supposed to have it, you may resort to collect:

static void distinctForSorted(IntStream s, IntConsumer action) {
    s.collect(()->new long[]{Long.MIN_VALUE},
              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
              (a, b)->{ throw new UnsupportedOperationException(); });
}

This works as it is the intended way of using mutable containers, however, it can’t work in parallel as splitting at arbitrary stream positions implies the possibility to encounter a value in two (or even more) threads.

If you want a general purpose IntStream rather than a forEach action, a Spliterator low level solution is preferred, despite the added complexity.

static IntStream distinctForSorted(IntStream s) {
    Spliterator.OfInt sp=s.spliterator();
    return StreamSupport.intStream(
      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
        long last=Long.MIN_VALUE;
        @Override
        public boolean tryAdvance(IntConsumer action) {
            long prev=last;
            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
            return true;
        }
        @Override
        public void forEachRemaining(IntConsumer action) {
            sp.forEachRemaining(distinct(action));
        }
        @Override
        public Comparator<? super Integer> getComparator() {
            return null;
        }
        private IntConsumer distinct(IntConsumer c) {
            return i-> {
                if(i==last) return;
                assert i>last;
                last=i;
                c.accept(i);
            };
        }
    }, false);
}

It even inherits a parallel support though it works by prefetching some values before processing them in another thread so it won’t accelerate the distinct operation, but maybe follow-up operations, if there are computation intense ones.


For completion, here is a distinct operation for arbitrary, i.e. unsorted, IntStreams which doesn’t rely on “boxing plus HashMap” thus may have a much better memory footprint:

static IntStream distinct(IntStream s) {
    boolean parallel=s.isParallel();
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
    if(parallel) s=s.parallel();
    return s;
}

It works for positive int values only; expanding it to the full 32 bit range would require two BitSets thus not look as concise, but often the use case allows limiting the storage to the 31 bit range or even lower…

Upvotes: 4

the8472
the8472

Reputation: 43042

The way to do this properly would be to turn the stream into a spliterator, then wrap it depending on the properties of the returned spliterator

  • performs naive deduplication using a concurrent set if the source is neither sorted nor distinct
  • performs optimized optimized dedpulication if the source spliterator is sorted.
    supporting trySplit operations will be tricky as it may have to advance the sub-spliterator a few steps until it can be sure it's not seeing the tail of a run of non-distinct elements.
  • just returns the spliterator as-is if the source is already distinct

Once you have that spliterator you can turn it back into a stream with the same properties and continue to do stream operations on it

Since we can't modify existing jdk-interfaces the helper API would have to look more like this: dedup(IntStream.of(...).map(...)).collect(...).


If you inspect the source of java.util.stream.DistinctOps.makeRef(AbstractPipeline<?, T, ?>) you will notice that the JDK more or less does that for reference-based streams.

It is just the IntStream implementation (java.util.stream.IntPipeline.distinct()) that takes an inefficient approach that does not take advantage of of DISTINCT or SORTED.

It just blindly converts an IntStream to a boxed Integer stream and uses the reference-based deduplication without passing along the appropriate flags that would make it memory-efficient.

If this isn't already fixed in jdk9 it might be worth a bug since it's essentially unnecessary memory consumption and wasted optimization potential for the stream ops if they needlessly discard stream-flags.

Upvotes: 1

Related Questions