Thirler
Thirler

Reputation: 20760

Create stream of streams from one long stream

I want to split a single Stream into a Stream of Streams based on the contents of the Streams. The resulting the Stream should contain part of the original streams' data.

My real application is more complex (it is grouping log lines that are within a list of time intervals), but my problem is how to handle the streams, so here I ask about a simplified example.

Example problem

I want to be able split a Stream<Integer> into a Stream<Stream<Integer>> based on the same number being repeated, only leaving the streams with odd numbers.

For example the following stream containing:

{1,1,1,2,2,2,3,6,7,7,1,1}

Would need to result in a stream of streams containing:

{{1,1,1},{3},{7,7},{1,1}}

Leaving out the even numbers I can do by starting (or ending) with a filter:

Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();

This is undesired as it would mean evaluating each input value twice, this is acceptable but I would prefer avoiding this.

Ideas for solutions

My current solution does this iterating over the contents of the stream and creating a List<List<Integer>> and converting that to a Stream<Stream<Integer>>. However this means the full result is kept in memory (which is undesired for my application).

I also think I would be able to pull this of by writing my own Iterator that reads from the stream, but I am not sure how this would work.

Question

How can I convert a Stream into a Stream of Streams based on the contents of the original Stream, without storing the full result in as a List of Lists first.

Upvotes: 9

Views: 4413

Answers (4)

Tagir Valeev
Tagir Valeev

Reputation: 100329

You can use my StreamEx library. It has groupRuns which does the job:

List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
    .groupRuns(Integer::equals)
    .map(List::stream);

Usage example:

streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);

Output:

1,1,1
3
7,7
1,1

Similar to protonpack library there's a custom spliterator inside, but using StreamEx you can take advantage of parallel processing (the protonpack does not split at all).

In sequential processing at most one intermediate list resides in memory at a time (others are eligible for GC). If you still worry about memory consumption (for example, you have very long groups), there is an alternative way to solve this task since the StreamEx 0.3.3:

Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
        .runLengths()
        .mapKeyValue(StreamEx::constant);

The runLengths method returns the stream of entries where key is the element and value is the number of adjacent repeating elements. After that StreamEx.constant is used which is shortcut for Stream.generate(() -> value).limit(length). So you will have a constant intermediate memory consumption even for very long groups. Of course this version is also parallel-friendly.

Update: StreamEx 0.3.3 is released, thus the second solution is now eligible as well.

Upvotes: 4

Jaroslaw Pawlak
Jaroslaw Pawlak

Reputation: 5588

I am afraid it is not doable, at least not in a nice way. Even if you map the elements into streams and reduce them, these internal streams will have to know what elements they contain so they will have to store something.

The simplest solution is to just use groupingBy however it will store all results in the map:

List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());

You could try using reduce operation but it would require you to implement your own Stream of Streams in which you would have to store what elements every stream contains anyway. Not to mention that it would be a lot of effort to implement it.

The best solution I can think of in your case would be to iterate over the list twice:

public static void main(String[] args) {
    List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);

    input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
        List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
        System.out.println(subList); // do something with the stream instead of collecting to list
    });
}

private static boolean isOdd(Integer i) {
    return (i & 1) == 1;
}

Note however that it has O(n^2) time complexity.

EDIT:

This solution will only have local groups of elements. It stores only the current local group.

public static void main(String[] args) {
    Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);

    Iterator<Integer> iterator = input.iterator();
    int first;
    int second = iterator.next();

    List<Integer> buffer = new ArrayList<>();
    buffer.add(second);

    do {
        first = second;
        second = iterator.next();

        if (Objects.equals(first, second)) {
            buffer.add(second);
        } else {
            doSomethingWithTheGroup(buffer);
            buffer = new ArrayList<>(); // let GC remove the previous buffer
            buffer.add(second);
        }
    } while (iterator.hasNext());
    doSomethingWithTheGroup(buffer);
}

private static void doSomethingWithTheGroup(List<Integer> buffer) {
    System.out.println(buffer);
}

private static boolean isOdd(Integer i) {
    return (i & 1) == 1;
}

output:

[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]

Upvotes: 3

Alexis C.
Alexis C.

Reputation: 93892

You may want to implement your own aggregating spliterator to do this. There's already something similar in the proton-pack library (the first link redirects to the one implemented in proton-pack).

Note that you get a Stream<List<Integer>> (you may try to modify the implementation to have a Stream<Stream<Integer>> directly, but you always need to buffer a small amount elements; depending on the window's size; to test whether you should create a new window or not). So for example:

StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1), 
                      Objects::equals)
           .forEach(System.out::println);

outputs:

[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]

Upvotes: 6

Sharon Ben Asher
Sharon Ben Asher

Reputation: 14383

Like @Jaroslaw, I also used Map to hold the different Streams. However, it is doable that the map will hold Streams that are built from the input and are not collected upfront. Using Stream.concat and Stream.of you can add one element to a stream:

    Map<Integer, Stream<Integer>> streamMap = new HashMap<>();

    int[] arr = {1,1,1,2,2,2,3,6,7,7,1,1};
    Arrays.stream(arr)
    .filter(this::isOdd)
    .forEach(i -> {
        Stream<Integer> st = streamMap.get(i);
        if (st == null)  st = Stream.of(i);
        else st = Stream.concat(st, Stream.of(i));
        streamMap.put(i, st);
    });

    streamMap.entrySet().stream().forEach(e -> {
        System.out.print(e.getKey() + "={");
        e.getValue().forEach(System.out::print);
        System.out.println("}");
    });

Output:

1={11111}
3={3}
7={77}

Upvotes: -1

Related Questions