Dims
Dims

Reputation: 51239

Is it possible to "merge" elements of Java8 streams?

Can I analyse previous and/or next elements with Java8 streams somehow?

For example, can I count identical adjacent numbers?

public class Merge {
   public static void main(String[] args) {

      Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

      // How to get 3, 2, 2, 4 from above

   }
}

Upvotes: 5

Views: 4080

Answers (5)

tevemadar
tevemadar

Reputation: 13225

Stream::collect() can do that for you. Here a hack is applied for brevity: as both the inputs and the outputs are numbers, and int in particular, the intermediate storage can be int[2], where the first element is the thing we are counting (so 0 and 1 in the example), and the second element is the counter. Later in the post there will be "real" something-counter pairs.

Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

List<Integer> result = stream.collect(
    ArrayList<int[]>::new,
    (list, i) -> {
        if (list.isEmpty() || list.get(list.size() - 1)[0] != i)
            list.add(new int[] { i, 1 });
        else
            list.get(list.size() - 1)[1]++;
    },
    (l1, l2) -> {
        if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1)[0] != l2.get(0)[0])
            l1.addAll(l2);
        else {
            l1.get(l1.size() - 1)[1] += l2.get(0)[1];
            l1.addAll(l2.subList(1, l2.size()));
        }
    }
).stream().map(pair -> pair[1]).collect(Collectors.toList());

System.out.println(result);

ArrayList<int[]>::new is the supplier, it creates the intermediate storage(s) when needed. Similar to "identity" in reduce(), but it can be reused.

The accumulator function (the (list, i) thing) checks if the list is empty or its last element is counting something else than i, and in that case adds a new pair, initialized with i as element, and 1 as count. Otherwise it just increases the existing counter as it counts the same kind of element as i. collect() does mutable accumulation, and thus nothing is returned (unlike with reduce()).

Then there is a "combiner" (the (l1, l2) thing), which has to be able to combine two partial results into one (into the first one of the two). Here we have to be prepared that a partial result may end with the beginning of the next partial result, that's what the if is checking: the two lists can be "blindly" appended if either of them is empty (unlikely, but who knows), or the last element of the first list is counting something else than the first element of the second list (here it's also handy that we already know that the lists are not empty, so those last/first elements exist). Otherwise we have to update the last element of the first list (with the counter of the first element in the second list), and only append the remaining elements.

And as we have a list of int[2]s at this point, a separate map-collect pair strips them into the counter part which we want.

The printed output is [3, 2, 2, 4] by the way.

How such partial results and the need for combining them could arise? One possible case is that this thing can work in parallel. Here is a variant with some logging, and also with a "proper" pair object instead of int[2]. It's not that proper, but Map.Entry<key,value> can be used as pair. It's a but clumsier (like requiring getValue()-setValue() instead of ++), but now the input could be anything, not just numbers. Input has been changed too, but only for logging purposes, it works with the original too.

Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 2, 2, 3, 3, 3, 3);

System.out.println(
    stream.parallel().collect(
        ArrayList<Map.Entry<Integer, Integer>>::new,
        (list, i) -> {
            System.out.println("acc " + list + " " + i + " " + Thread.currentThread());
            if (list.isEmpty() || list.get(list.size() - 1).getKey() != i)
                list.add(new AbstractMap.SimpleEntry<Integer, Integer>(i, 1));
            else {
                var p = list.get(list.size() - 1);
                p.setValue(p.getValue() + 1);
            }
        }, (l1, l2) -> {
            System.out.println("comb " + l1 + " " + l2 + " " + Thread.currentThread());
            if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1).getKey() != l2.get(0).getKey())
                l1.addAll(l2);
            else {
                var p = l1.get(l1.size() - 1);
                p.setValue(p.getValue() + l2.get(0).getValue());
                l1.addAll(l2.subList(1, l2.size()));
            }
        }
    )
);

It may need a couple runs, but there are times when it actually runs multi-threaded, producing an output like this:

acc [] 2 Thread[main,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[main,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [2=1] [3=1] Thread[main,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [1=1] [1=1] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 2 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=1] [2=1, 3=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [1=2] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [3=1] [3=1] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [3=1] [3=2] Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [0=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=2] [0=1, 1=2] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=2, 3=1] [3=3] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [0=3, 1=2] [2=2, 3=4] Thread[ForkJoinPool.commonPool-worker-5,5,main]
[0=3, 1=2, 2=2, 3=4]

What is visible in this particular run is that all 11 input values were accumulated separately (the acc [] x lines, where [] shows that a brand new empty list was passed), in one of 4 threads (the main thread, and worker threads 3-5-7), and those initial steps happen in quite arbitrary order, then results are combined (but here the order is maintained), and thus the special combining step (when lists aren't just appended, but a counter needs to be updated) is indeed used quite often.
The final pair-number transformation is skipped here, that's why the elements and their count are both printed.

For comparison, the same code, just without the parallel() call simply uses the accumulator function, stepping through the input stream sequentially. I'm not sure if combining would ever happen in this sequential case, perhaps for large inputs.

acc [] 0 Thread[main,5,main]
acc [0=1] 0 Thread[main,5,main]
acc [0=2] 0 Thread[main,5,main]
acc [0=3] 1 Thread[main,5,main]
acc [0=3, 1=1] 1 Thread[main,5,main]
acc [0=3, 1=2] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=1] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=1] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=3] 3 Thread[main,5,main]
[0=3, 1=2, 2=2, 3=4]

Upvotes: 1

Fabio O. Padilha
Fabio O. Padilha

Reputation: 119

You can use reduce function to merge the items in a TreeMap. If you want only the count, you can get the values of the map.

public class Merge {
   public static void main(String[] args) {

      Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

      Map<Integer,Integer> map = stream.reduce(new TreeMap<Integer,Integer>(), (map, n) -> {
          if (map.isEmpty() || map.lastKey() != n)
              map.put(n, 1);
          else{
              map.put(map.lastKey(), map.lastEntry().getValue() + 1);
          }
          return map;
      }, (list,list2) -> list);

      Collection<Integer> numbers = map.values();

   }
}

Upvotes: 0

If you want it to be lazy, you have to escape the Stream API through Stream.iterator() or Stream.spliterator().

Otherwise the way to do it is to call the terminal operation Stream.collect(Collector) with a custom collector, which will consume the whole stream.


@Test
public void test() {
    Stream<Integer> input = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

    UniqCountSpliterator uniqCountSpliterator = new UniqCountSpliterator(input.spliterator());

    long[] output = uniqCountSpliterator.stream()
            .toArray();

    long[] expected = {3, 2, 2, 4};

    assertArrayEquals(expected, output);
}

import java.util.Spliterator;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class UniqCountSpliterator implements Spliterator.OfLong {
    private Spliterator wrapped;
    private long count;
    private Object previous;
    private Object current;

    public UniqCountSpliterator(Spliterator wrapped) {
        this.wrapped = wrapped;
    }

    public LongStream stream() {
        return StreamSupport.longStream(this, false);
    }

    @Override
    public OfLong trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
        return NONNULL | IMMUTABLE;
    }

    @Override
    public boolean tryAdvance(LongConsumer action) {
        while (wrapped.tryAdvance(next -> current = next) && (null == previous || current.equals(previous))) {
            count++;
            previous = current;
        }
        if (previous == null) {
            return false;
        }
        action.accept(count);
        count = 1;
        previous = null;
        return true;
    }
}

Upvotes: 5

Thomas Shields
Thomas Shields

Reputation: 8943

If you don't mind two statements, you can setup a list to fill up with the counts, and then use reduce:

List<Integer> counts = new ArrayList<>();
Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1).reduce((i, j) -> {
    if (counts.isEmpty()) {
        counts.add(1);
    }

    if (j == i) {
        int index = counts.size() - 1;
        counts.set(index, counts.get(index) + 1);
    } else {
        counts.add(1);
    }
    return j;
});

Upvotes: 0

Artur Biesiadowski
Artur Biesiadowski

Reputation: 3698

You can almost do it with flatMap. It would work for infinite streams, with finite stream I don't see a way to detect end of stream from within it.

    Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

    Stream<Integer> flatMap = stream.flatMap(new Function<Integer, Stream<Integer>>() {
        Integer prev = null;
        int count;
        public java.util.stream.Stream<Integer> apply(Integer i) {
            if ( i.equals(prev)) {
                count++;
                return Stream.empty();
            } else {
                int c = count;
                count = 1;
                prev = i;
                if ( c > 0 ) {
                    return Stream.of(c);
                } else {
                    return Stream.empty();
                }
            }
        };
    });

    flatMap.forEach(i -> {
        System.out.println(i);
    });

Said that, you could probably get a lot better mileage out of rxjava for such kind of things (where you could use Subject to emit values as you wish and be able to detect end of stream).

Of course, if you want to escape Stream boundaries, there are many options, as indicated by Christoffers answer.

Upvotes: 1

Related Questions