Reputation: 20760
I want to split a single Stream
into a Stream
of Streams
based on the contents of the Streams
. The resulting the Stream
should contain part of the original streams' data.
My real application is more complex (it is grouping log lines that are within a list of time intervals), but my problem is how to handle the streams, so here I ask about a simplified example.
I want to be able split a Stream<Integer>
into a Stream<Stream<Integer>>
based on the same number being repeated, only leaving the streams with odd numbers.
For example the following stream containing:
{1,1,1,2,2,2,3,6,7,7,1,1}
Would need to result in a stream of streams containing:
{{1,1,1},{3},{7,7},{1,1}}
Leaving out the even numbers I can do by starting (or ending) with a filter:
Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();
This is undesired as it would mean evaluating each input value twice, this is acceptable but I would prefer avoiding this.
My current solution does this iterating over the contents of the stream and creating a List<List<Integer>>
and converting that to a Stream<Stream<Integer>>
. However this means the full result is kept in memory (which is undesired for my application).
I also think I would be able to pull this of by writing my own Iterator
that reads from the stream, but I am not sure how this would work.
How can I convert a Stream
into a Stream
of Streams
based on the contents of the original Stream
, without storing the full result in as a List
of Lists
first.
Upvotes: 9
Views: 4413
Reputation: 100329
You can use my StreamEx
library. It has groupRuns
which does the job:
List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.groupRuns(Integer::equals)
.map(List::stream);
Usage example:
streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);
Output:
1,1,1
3
7,7
1,1
Similar to protonpack library there's a custom spliterator inside, but using StreamEx you can take advantage of parallel processing (the protonpack does not split at all).
In sequential processing at most one intermediate list resides in memory at a time (others are eligible for GC). If you still worry about memory consumption (for example, you have very long groups), there is an alternative way to solve this task since the StreamEx 0.3.3:
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.runLengths()
.mapKeyValue(StreamEx::constant);
The runLengths
method returns the stream of entries where key is the element and value is the number of adjacent repeating elements. After that StreamEx.constant
is used which is shortcut for Stream.generate(() -> value).limit(length)
. So you will have a constant intermediate memory consumption even for very long groups. Of course this version is also parallel-friendly.
Update: StreamEx 0.3.3 is released, thus the second solution is now eligible as well.
Upvotes: 4
Reputation: 5588
I am afraid it is not doable, at least not in a nice way. Even if you map the elements into streams and reduce them, these internal streams will have to know what elements they contain so they will have to store something.
The simplest solution is to just use groupingBy
however it will store all results in the map:
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());
You could try using reduce
operation but it would require you to implement your own Stream of Streams in which you would have to store what elements every stream contains anyway. Not to mention that it would be a lot of effort to implement it.
The best solution I can think of in your case would be to iterate over the list twice:
public static void main(String[] args) {
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
System.out.println(subList); // do something with the stream instead of collecting to list
});
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
Note however that it has O(n^2)
time complexity.
EDIT:
This solution will only have local groups of elements. It stores only the current local group.
public static void main(String[] args) {
Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Iterator<Integer> iterator = input.iterator();
int first;
int second = iterator.next();
List<Integer> buffer = new ArrayList<>();
buffer.add(second);
do {
first = second;
second = iterator.next();
if (Objects.equals(first, second)) {
buffer.add(second);
} else {
doSomethingWithTheGroup(buffer);
buffer = new ArrayList<>(); // let GC remove the previous buffer
buffer.add(second);
}
} while (iterator.hasNext());
doSomethingWithTheGroup(buffer);
}
private static void doSomethingWithTheGroup(List<Integer> buffer) {
System.out.println(buffer);
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
output:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
Upvotes: 3
Reputation: 93892
You may want to implement your own aggregating spliterator to do this. There's already something similar in the proton-pack library (the first link redirects to the one implemented in proton-pack).
Note that you get a Stream<List<Integer>>
(you may try to modify the implementation to have a Stream<Stream<Integer>>
directly, but you always need to buffer a small amount elements; depending on the window's size; to test whether you should create a new window or not). So for example:
StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1),
Objects::equals)
.forEach(System.out::println);
outputs:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
Upvotes: 6
Reputation: 14383
Like @Jaroslaw, I also used Map to hold the different Streams. However, it is doable that the map will hold Streams that are built from the input and are not collected upfront. Using Stream.concat
and Stream.of
you can add one element to a stream:
Map<Integer, Stream<Integer>> streamMap = new HashMap<>();
int[] arr = {1,1,1,2,2,2,3,6,7,7,1,1};
Arrays.stream(arr)
.filter(this::isOdd)
.forEach(i -> {
Stream<Integer> st = streamMap.get(i);
if (st == null) st = Stream.of(i);
else st = Stream.concat(st, Stream.of(i));
streamMap.put(i, st);
});
streamMap.entrySet().stream().forEach(e -> {
System.out.print(e.getKey() + "={");
e.getValue().forEach(System.out::print);
System.out.println("}");
});
Output:
1={11111}
3={3}
7={77}
Upvotes: -1