Reputation: 103
For a given stream of data:
Flux<Integer> evenFlux = Flux.just(1, 2, 3, 4, 5, 6, 7)
.filter(i -> i % 2 == 0)
Flux<Integer> oddFlux = Flux.just(1, 2, 3, 4, 5, 6, 7)
.filter(i -> i % 2 != 0)
Instead of this how can I split using a single pipeline?
All filtered elements to one Flux
and discarded elements to another Flux
. Using onDiscardHook
or something?
Note: I need to do a repeat()
operation on the new Flux
. Was not able to perform repeat on GroupedFlux
.
Upvotes: 2
Views: 1174
Reputation: 1673
It depends, you have two options here:
Option 1 is to create two new pipelines from hot input source:
ConnectableFlux<Integer> input = Flux.range(1,7).publish();
//Pipeline 1
input.filter(i -> i % 2== 0)
.subscribe(e -> System.out.println("Even stream value: " + e));
//Pipeline 2
input.filter(i -> i % 2 != 0)
.subscribe(o -> System.out.println("Odd stream value: " + o));
input.connect();
Option 2 is to process results in a single subscriber, like grouping by:
Flux.just(1,2,3,4,5,6,7)
.subscribe(n -> {
if (n % 2 == 0) {
System.out.println("Even stream value: " + n);
} else {
System.out.println("Odd stream value: " + n);
}
});
Upvotes: 1
Reputation: 8114
As suggested by @Michael Berry, using groupBy
is one of the solution.
We can group using i % 2
which return a Flux<GroupedFlux>
, then we can use GroupedFlux#key()
to distinguish odd or even group.
public class SplitFlux {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7).groupBy(i -> i % 2).toStream()
.forEach(groupedFlux ->
{
System.out.println(groupedFlux.key() == 0 ? "even" : "odd");
groupedFlux.toStream().forEach(System.out::println);
});
}
}
Upvotes: 0