Reputation: 51239
Can I analyse previous and/or next elements with Java8 streams somehow?
For example, can I count identical adjacent numbers?
public class Merge {
public static void main(String[] args) {
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
// How to get 3, 2, 2, 4 from above
}
}
Upvotes: 5
Views: 4080
Reputation: 13225
Stream::collect()
can do that for you. Here a hack is applied for brevity: as both the inputs and the outputs are numbers, and int
in particular, the intermediate storage can be int[2]
, where the first element is the thing we are counting (so 0
and 1
in the example), and the second element is the counter. Later in the post there will be "real" something-counter pairs.
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
List<Integer> result = stream.collect(
ArrayList<int[]>::new,
(list, i) -> {
if (list.isEmpty() || list.get(list.size() - 1)[0] != i)
list.add(new int[] { i, 1 });
else
list.get(list.size() - 1)[1]++;
},
(l1, l2) -> {
if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1)[0] != l2.get(0)[0])
l1.addAll(l2);
else {
l1.get(l1.size() - 1)[1] += l2.get(0)[1];
l1.addAll(l2.subList(1, l2.size()));
}
}
).stream().map(pair -> pair[1]).collect(Collectors.toList());
System.out.println(result);
ArrayList<int[]>::new
is the supplier, it creates the intermediate storage(s) when needed. Similar to "identity" in reduce()
, but it can be reused.
The accumulator function (the (list, i)
thing) checks if the list
is empty or its last element is counting something else than i
, and in that case adds a new pair, initialized with i
as element, and 1
as count. Otherwise it just increases the existing counter as it counts the same kind of element as i
. collect()
does mutable accumulation, and thus nothing is returned (unlike with reduce()
).
Then there is a "combiner" (the (l1, l2)
thing), which has to be able to combine two partial results into one (into the first one of the two). Here we have to be prepared that a partial result may end with the beginning of the next partial result, that's what the if
is checking: the two lists can be "blindly" appended if either of them is empty (unlikely, but who knows), or the last element of the first list is counting something else than the first element of the second list (here it's also handy that we already know that the lists are not empty, so those last/first elements exist). Otherwise we have to update the last element of the first list (with the counter of the first element in the second list), and only append the remaining elements.
And as we have a list of int[2]
s at this point, a separate map
-collect
pair strips them into the counter part which we want.
The printed output is [3, 2, 2, 4]
by the way.
How such partial results and the need for combining them could arise? One possible case is that this thing can work in parallel. Here is a variant with some logging, and also with a "proper" pair object instead of int[2]
. It's not that proper, but Map.Entry<key,value>
can be used as pair. It's a but clumsier (like requiring getValue()
-setValue()
instead of ++
), but now the input could be anything, not just numbers. Input has been changed too, but only for logging purposes, it works with the original too.
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 2, 2, 3, 3, 3, 3);
System.out.println(
stream.parallel().collect(
ArrayList<Map.Entry<Integer, Integer>>::new,
(list, i) -> {
System.out.println("acc " + list + " " + i + " " + Thread.currentThread());
if (list.isEmpty() || list.get(list.size() - 1).getKey() != i)
list.add(new AbstractMap.SimpleEntry<Integer, Integer>(i, 1));
else {
var p = list.get(list.size() - 1);
p.setValue(p.getValue() + 1);
}
}, (l1, l2) -> {
System.out.println("comb " + l1 + " " + l2 + " " + Thread.currentThread());
if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1).getKey() != l2.get(0).getKey())
l1.addAll(l2);
else {
var p = l1.get(l1.size() - 1);
p.setValue(p.getValue() + l2.get(0).getValue());
l1.addAll(l2.subList(1, l2.size()));
}
}
)
);
It may need a couple runs, but there are times when it actually runs multi-threaded, producing an output like this:
acc [] 2 Thread[main,5,main] acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main] acc [] 3 Thread[main,5,main] acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main] acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main] comb [2=1] [3=1] Thread[main,5,main] acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main] acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main] comb [1=1] [1=1] Thread[ForkJoinPool.commonPool-worker-3,5,main] acc [] 2 Thread[ForkJoinPool.commonPool-worker-7,5,main] comb [2=1] [2=1, 3=1] Thread[ForkJoinPool.commonPool-worker-7,5,main] comb [0=1] [1=2] Thread[ForkJoinPool.commonPool-worker-3,5,main] acc [] 3 Thread[ForkJoinPool.commonPool-worker-3,5,main] acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main] comb [3=1] [3=1] Thread[ForkJoinPool.commonPool-worker-5,5,main] comb [3=1] [3=2] Thread[ForkJoinPool.commonPool-worker-5,5,main] acc [] 0 Thread[ForkJoinPool.commonPool-worker-7,5,main] comb [0=1] [0=1] Thread[ForkJoinPool.commonPool-worker-7,5,main] comb [0=2] [0=1, 1=2] Thread[ForkJoinPool.commonPool-worker-7,5,main] comb [2=2, 3=1] [3=3] Thread[ForkJoinPool.commonPool-worker-5,5,main] comb [0=3, 1=2] [2=2, 3=4] Thread[ForkJoinPool.commonPool-worker-5,5,main] [0=3, 1=2, 2=2, 3=4]
What is visible in this particular run is that all 11 input values were accumulated separately (the acc [] x
lines, where []
shows that a brand new empty list was passed), in one of 4 threads (the main thread, and worker threads 3-5-7), and those initial steps happen in quite arbitrary order, then results are combined (but here the order is maintained), and thus the special combining step (when lists aren't just appended, but a counter needs to be updated) is indeed used quite often.
The final pair-number transformation is skipped here, that's why the elements and their count are both printed.
For comparison, the same code, just without the parallel()
call simply uses the accumulator function, stepping through the input stream sequentially. I'm not sure if combining would ever happen in this sequential case, perhaps for large inputs.
acc [] 0 Thread[main,5,main] acc [0=1] 0 Thread[main,5,main] acc [0=2] 0 Thread[main,5,main] acc [0=3] 1 Thread[main,5,main] acc [0=3, 1=1] 1 Thread[main,5,main] acc [0=3, 1=2] 2 Thread[main,5,main] acc [0=3, 1=2, 2=1] 2 Thread[main,5,main] acc [0=3, 1=2, 2=2] 3 Thread[main,5,main] acc [0=3, 1=2, 2=2, 3=1] 3 Thread[main,5,main] acc [0=3, 1=2, 2=2, 3=2] 3 Thread[main,5,main] acc [0=3, 1=2, 2=2, 3=3] 3 Thread[main,5,main] [0=3, 1=2, 2=2, 3=4]
Upvotes: 1
Reputation: 119
You can use reduce function to merge the items in a TreeMap. If you want only the count, you can get the values of the map.
public class Merge {
public static void main(String[] args) {
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
Map<Integer,Integer> map = stream.reduce(new TreeMap<Integer,Integer>(), (map, n) -> {
if (map.isEmpty() || map.lastKey() != n)
map.put(n, 1);
else{
map.put(map.lastKey(), map.lastEntry().getValue() + 1);
}
return map;
}, (list,list2) -> list);
Collection<Integer> numbers = map.values();
}
}
Upvotes: 0
Reputation: 27864
If you want it to be lazy, you have to escape the Stream API through Stream.iterator()
or Stream.spliterator()
.
Otherwise the way to do it is to call the terminal operation Stream.collect(Collector)
with a custom collector, which will consume the whole stream.
@Test
public void test() {
Stream<Integer> input = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
UniqCountSpliterator uniqCountSpliterator = new UniqCountSpliterator(input.spliterator());
long[] output = uniqCountSpliterator.stream()
.toArray();
long[] expected = {3, 2, 2, 4};
assertArrayEquals(expected, output);
}
import java.util.Spliterator;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class UniqCountSpliterator implements Spliterator.OfLong {
private Spliterator wrapped;
private long count;
private Object previous;
private Object current;
public UniqCountSpliterator(Spliterator wrapped) {
this.wrapped = wrapped;
}
public LongStream stream() {
return StreamSupport.longStream(this, false);
}
@Override
public OfLong trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return NONNULL | IMMUTABLE;
}
@Override
public boolean tryAdvance(LongConsumer action) {
while (wrapped.tryAdvance(next -> current = next) && (null == previous || current.equals(previous))) {
count++;
previous = current;
}
if (previous == null) {
return false;
}
action.accept(count);
count = 1;
previous = null;
return true;
}
}
Upvotes: 5
Reputation: 8943
If you don't mind two statements, you can setup a list to fill up with the counts, and then use reduce
:
List<Integer> counts = new ArrayList<>();
Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1).reduce((i, j) -> {
if (counts.isEmpty()) {
counts.add(1);
}
if (j == i) {
int index = counts.size() - 1;
counts.set(index, counts.get(index) + 1);
} else {
counts.add(1);
}
return j;
});
Upvotes: 0
Reputation: 3698
You can almost do it with flatMap. It would work for infinite streams, with finite stream I don't see a way to detect end of stream from within it.
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
Stream<Integer> flatMap = stream.flatMap(new Function<Integer, Stream<Integer>>() {
Integer prev = null;
int count;
public java.util.stream.Stream<Integer> apply(Integer i) {
if ( i.equals(prev)) {
count++;
return Stream.empty();
} else {
int c = count;
count = 1;
prev = i;
if ( c > 0 ) {
return Stream.of(c);
} else {
return Stream.empty();
}
}
};
});
flatMap.forEach(i -> {
System.out.println(i);
});
Said that, you could probably get a lot better mileage out of rxjava for such kind of things (where you could use Subject to emit values as you wish and be able to detect end of stream).
Of course, if you want to escape Stream boundaries, there are many options, as indicated by Christoffers answer.
Upvotes: 1