Reputation: 1476
How can I "shrink" a Java 8 stream containing a lot of items to a stream containing fewer?
I am not asking about mapping, where there's 1 "output" item for each input item, or reduction where a stream is reduced to a single value, but shrinking a stream of many items to one with fewer. The "shrinking" is stateful; emitting an item is based upon 1 or more previous items (though it's simply moving forward so the state is very simple).
I have a stream of simple timestamped events; either a START or STOP event. I need to reduce this stream of simple events into records, each comprising a start and stop time. In the simplest case, there is a START and STOP pair, but it is totally legal for there to be repeated STARTs without intervening STOPs. It's also legal, though degenerate, for there to be repeated STOPs.
Below is a (simplified) version to demonstrate. See the difference between input
and expected
; there are more input items than output.
The key thing is that shrinkEvents signature is in terms of Streams, not Lists. I would like a version that doesn't need the intermediate List<String> output
in shrinkEvents
.
public class ShrinkStream {
@Test
public void shrinkStream() {
Stream<String> input = Stream.of("START@1", "STOP@12", "START@14", "START@24", "STOP@35", "STOP@45");
List<String> expected = Arrays.asList("1-12", "14-24", "24-35");
Stream<String> actual = shrinkEvents(input);
assertEquals(expected, actual.collect(toList()));
}
private Stream<String> shrinkEvents(Stream<String> input) {
List<String> output = new ArrayList<>();
final StringBuilder startTime = new StringBuilder(); // mutable (effectively final BS)
input.forEach(s -> {
String[] tokens = s.split("@");
String type = tokens[0];
String time = tokens[1];
boolean isAlreadyActive = startTime.length() > 0;
if (isAlreadyActive)
output.add(startTime + "-" + time);
startTime.setLength(0); // reset
if (type.equals("START"))
startTime.append(time);
});
return output.stream();
}
}
Upvotes: 1
Views: 276
Reputation: 1355
The purpose of a string is to examine the elements inside the Stream independently for others without the concern of processing the element in order.
In this scenario, your ask is a little stretch because we need to keep track of the previous "START" element. The more correct way that I see is to use a Custom Collector.
public class ShrinkStream {
@Test
public void shrinkStream() {
Stream<String> input = Stream.of("START@1", "STOP@12", "START@14", "START@24", "STOP@35", "STOP@45").parallel();
List<String> expected = Arrays.asList("1-12", "14-24", "24-35");
MyShrinkCollector myShrinkCollector= new MyShrinkCollector();
assertEquals(expected, input.collect(myShrinkCollector));
}
}
public class MyShrinkCollector implements Collector<String, List<String>, List<String>> {
private String startNumber = null;
@Override
public Supplier<List<String>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<String>, String> accumulator() {
return (list, val) -> {
String[] s = val.split("@");
String type = s[0];
String num = s[1];
if (startNumber != null) {
list.add(startNumber + "-" + num);
startNumber = null;
}
if (type.equals("START")) startNumber = num;
};
}
@Override
public BinaryOperator<List<String>> combiner() {
return null;
}
@Override
public Function<List<String>, List<String>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return new HashSet<>();
}
}
Upvotes: 0