supraja
supraja

Reputation: 103

Split a Flux into two based on a condition without GroupBy

For a given stream of data:

Flux<Integer> evenFlux = Flux.just(1, 2, 3, 4, 5, 6, 7)
                         .filter(i -> i % 2 == 0)

Flux<Integer> oddFlux = Flux.just(1, 2, 3, 4, 5, 6, 7)
                         .filter(i -> i % 2 != 0)

Instead of this how can I split using a single pipeline? All filtered elements to one Flux and discarded elements to another Flux. Using onDiscardHook or something?

Note: I need to do a repeat() operation on the new Flux. Was not able to perform repeat on GroupedFlux.

Upvotes: 2

Views: 1174

Answers (2)

Rafael Guillen
Rafael Guillen

Reputation: 1673

It depends, you have two options here:

Option 1 is to create two new pipelines from hot input source:

ConnectableFlux<Integer> input = Flux.range(1,7).publish();

//Pipeline 1
input.filter(i -> i % 2== 0)
    .subscribe(e -> System.out.println("Even stream value: " + e));

//Pipeline 2
input.filter(i -> i % 2 != 0)
    .subscribe(o -> System.out.println("Odd stream value: " + o));

input.connect();

Option 2 is to process results in a single subscriber, like grouping by:

Flux.just(1,2,3,4,5,6,7)
    .subscribe(n -> {
      if (n % 2 == 0) {
        System.out.println("Even stream value: " + n);
      } else {
        System.out.println("Odd stream value: " + n);
      }
    });

Upvotes: 1

samabcde
samabcde

Reputation: 8114

As suggested by @Michael Berry, using groupBy is one of the solution.
We can group using i % 2 which return a Flux<GroupedFlux>, then we can use GroupedFlux#key() to distinguish odd or even group.

public class SplitFlux {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7).groupBy(i -> i % 2).toStream()
                .forEach(groupedFlux ->
                {
                    System.out.println(groupedFlux.key() == 0 ? "even" : "odd");
                    groupedFlux.toStream().forEach(System.out::println);
                });
    }
}

Upvotes: 0

Related Questions