Spille
Spille

Reputation: 597

Using a reduce operation on a Java 8 Stream to group adjacent entries with the same key

I've got a java.util.stream.Stream containing key/value pairs like:

<1,3> <1,5> <3,1> <4,2> <4,7> <4,8>

Now I would like to merge all entries, which have the same key:

<1,[3,5]> <3,[1]> <4,[2,7,8]>

The data is already sorted, so only consecutive datasets have to be merged.

Now I'm searching for a way to transform the the content of the stream like above, without loading all datasets into memory.

I'd prefer to get a java.util.stream.Stream as result with a different object type containing a list of values instead of a single value.

My only approach is a custom iterator, which performs the merge, but it seems to be pretty ugly to convert to an iterator and back to a stream.

What is the best approach for it?

Upvotes: 6

Views: 217

Answers (3)

M. Justin
M. Justin

Reputation: 21347

The groupRuns method in the StreamEx library can be used to do this:

record Pair(int key, int value) {}

final List<Pair> list = List.of(
        new Pair(1, 11), new Pair(1, 12),
        new Pair(2, 13), new Pair(2, 14), new Pair(2, 15),
        new Pair(3, 16)
);

Stream<List<Pair>> stream = StreamEx.of(list.stream())
        .groupRuns((p1, p2) -> p1.key() == p2.key());

Resulting stream elements:

- [(1, 11), (1, 12)]
- [(2, 13), (2, 14), (2, 15)]
- [(3, 16)]

Upvotes: 0

M. Justin
M. Justin

Reputation: 21347

The following is a solution using a custom Gatherer with the upcoming Java 24. As requested, it does not load the whole dataset into memory – only a single run of identical keys is kept at a given time.

record Pair(int key, int value) {
    @Override public String toString() { return "<%s,%s>".formatted(key, value); }
}
List<Pair> list = List.of(new Pair(1, 3), new Pair(1, 5), new Pair(3, 1),
        new Pair(4, 2), new Pair(4, 7), new Pair(4, 8));

List<List<Pair>> groups = list.stream()
        .gather(Gatherer.<Pair, List<Pair>, List<Pair>>ofSequential(
                ArrayList::new,
                Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                    if (state.isEmpty() || state.getFirst().key() == element.key()) {
                        state.add(element);
                        return true;
                    } else {
                        List<Pair> group = List.copyOf(state);
                        state.clear();
                        state.add(element);
                        return downstream.push(group);
                    }
                }),
                (state, downstream) -> {
                    if (!state.isEmpty()) {
                        downstream.push(List.copyOf(state));
                    }
                }))
        .toList();

// Prints [[<1,3>, <1,5>], [<3,1>], [<4,2>, <4,7>, <4,8>]]
System.out.println(groups);

This gatherer uses a mutable list as its state, and collects elements in that list until it encounters one with a different key. At this point, it pushes a copy of the state containing all elements with the given key downstream. It clears the state and pushes the new element with the different key, and repeats the behavior from there. Finally, once the final element of the stream has been reached, it pushes a copy of the state, which is the final group in the list.

Upvotes: 0

user_3380739
user_3380739

Reputation: 1284

Here is the solution by SteamEx.

int[][] datasets = { { 1, 3 }, { 1, 5 }, { 3, 1 }, { 4, 2 }, { 4, 7 }, { 4, 8 } };

StreamEx.of(datasets) //
        .collapse((a, b) -> a[0] == b[0], groupingBy(a -> a[0], mapping(a -> a[1], toList()))) //
        .forEach(System.out::println);

you can replace int[] with your dataset object. We can add peek to verify if it's lazy loading/calculation:

StreamEx.of(datasets) //
        .peek(System.out::println) //
        .collapse((a, b) -> a[0] == b[0], groupingBy(a -> a[0], mapping(a -> a[1], toList()))) //
        .limit(1) //
        .forEach(System.out::println);

Upvotes: 4

Related Questions