slartidan
slartidan

Reputation: 21566

How to reduce a stream into another stream in Java8?

As an example, I want to create an infinite stream of Groups of tens like this:

0=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
2=[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
...

I want to use an inifinte stream of ints as an Input, which should then be grouped. If the first stream iterates 10 times the resulting stream should have iterated only once.

My working, but not very elegant code looks like this:

// create a stream from 0 (inclusive) to 100 (exclusive)
IntStream.iterate(0, i -> i+1).boxed().limit(100)

// slow down
.peek((i) -> {try {Thread.sleep(50);} catch (InterruptedException e) {}})

// group by tens
/* ugly: */.collect(Collectors.groupingBy(i -> i / 10)).entrySet()
/* not working: */ //.makeSequentialGroups(i -> i / 10)

// print to console
.forEach(System.out::println);  

How do I make groups of an int stream, without having to collect and re-stream? (If possible even without having to use boxing)

Upvotes: 2

Views: 3074

Answers (4)

Hank D
Hank D

Reputation: 6471

You can think of an array as a Map with a key type of primitive int, the only difference being that instead of looking up a value via map.get(i) you look up a value via myArray[i]. Using an array to group your arrays lets you avoid boxing as well, as you requested. Here is a solution that produces similar results without boxing.

    int[][] results = IntStream.iterate(0, i -> i + 10)
            .limit(10)
            .mapToObj(i -> (int[]) IntStream.range(i, i + 10).toArray())
            .toArray(int[][]::new);

    System.out.println(Arrays.deepToString(results));

Upvotes: 0

Tagir Valeev
Tagir Valeev

Reputation: 100159

Such feature is available in my StreamEx library and called groupRuns: you can collect adjacent elements into the intermediate List based on supplied predicate. Example:

IntStreamEx.iterate(0, i -> i+1).boxed().limit(100)
    .peek((i) -> {try {Thread.sleep(50);} catch (InterruptedException e) {}})
    .groupRuns((a, b) -> a/10 == b/10)
    .forEach(System.out::println);

Upvotes: 2

slartidan
slartidan

Reputation: 21566

It seems like if one stream is based on another stream, than it always has to have the exact same number of entries.

However I found a patially solution to my problem: I wrapped the consumer into a "GroupingConsumer". This will terminate the initial stream, but will still be able to be executed infinitly.

The resulting code snipped:

// create a stream from 0 (inclusive) to infinity!
IntStream.iterate(0, i -> i+1).boxed()

// slow down
.peek((i) -> {try {Thread.sleep(50);} catch (InterruptedException e) {}})

// terminate the stream of single items (ungrouped)
.forEach(

    // create a wrap-around
    GroupingConsumer.create(

        // define the grouping rule
        i -> i/10,

        // the wrapped consumer
        System.out::println
)); 

The GroupingConsumer class:

import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * Forwards a group of items, whenever the grouping-key changes
 *
 * @param <K> the type of the grouping key
 * @param <T> the type of the single entries
 */
class GroupingConsumer<K, T> implements Consumer<K> {

    private Function<K, T> keyCalculator;
    private Consumer<Entry<T, List<K>>> consumer;

    Entry<T, List<K>> currentGroup;

    /**
     * Wraps your consumer, so that it will get groups of items instead of single items.
     * 
     * @param keyCalculator the "grouping by"
     * @param consumer your consumer, that will be called less frequently
     * @return the wrapped consumer
     */
    public static <K, T> GroupingConsumer<K,T> create(Function<K, T> keyCalculator, Consumer<Entry<T, List<K>>> consumer) {
        GroupingConsumer<K, T> groupingConsumer = new GroupingConsumer<K, T>();
        groupingConsumer.consumer = consumer;
        groupingConsumer.keyCalculator = keyCalculator;
        return groupingConsumer;
    }

    @Override
    public void accept(K nextValue) {
        T key = keyCalculator.apply(nextValue);

        boolean newGroupRequired = false;

        if (currentGroup == null)
            newGroupRequired = true;
        else if (!currentGroup.getKey().equals(key)) {
            newGroupRequired = true;
            consumer.accept(currentGroup);
        }

        if (newGroupRequired)
            currentGroup = new SimpleEntry<T, List<K>>(key, new ArrayList<K>());
        currentGroup.getValue().add(nextValue);
    }
}

Upvotes: 1

Lodewijk Bogaards
Lodewijk Bogaards

Reputation: 19987

I doubt there is a way, since you can not map from a sequence to a Map in java 8 without collecting nor can you groupBy without collecting. You could create your own stream, but I doubt you really want to go that route.

So, though this is not an answer, I'd go with something like this if you want to save on some clock cycles:

IntStream.range(0, 10)
          .boxed()
          .collect(Collectors.toMap(
              Function.identity(), 
              (x) -> IntStream.range(x * 10, x * 10 + 10)
          )) 

Upvotes: 2

Related Questions