Reputation: 866
Let's say I have both var a = Flux.just("A", "B", "C")
and var b = Flux.just("B", "C", "D")
I want to be able to intersect both variables and the result should be equivalent of a set intersect
Something like a.intersect(b)
or Flux.intersect(a, b)
that would result in (Flux of) ["B", "C"]
I could not find any operation that does this, any ideas?
Upvotes: 4
Views: 1579
Reputation: 11551
I like efficiency, so I like to use what is proven without overly depending on streaming (or fluxing) operations.
Disadvantage of this is the need to collect one of the fluxes into a sorted list. Perhaps you can know in advance whether one Flux is shorter. Seems to me however that are going to have to do such a thing no matter what since you have to compare each element of Flux A against all elements in Flux B (or at least until you find a match).
So, collect Flux A into a sorted list and then there is no reason not to use Collections::binarySearch
on your collected/sorted flux.
a.collectSortedList()
.flatMapMany(sorteda -> b.filter(be->Collections.binarySearch(sorteda, be)>=0))
.subscribe(System.out::println);
Upvotes: 0
Reputation: 11216
You could use join, filter, map and groupBy like so
//Join fluxes in tuple
a.join(b,s -> Flux.never(), s-> Flux.never(),Tuples::of)
//Filter out matching
.filter(t -> t.getT1().equals(t.getT2()))
//Revert to single value
.map(Tuple2::getT1)
//Remove duplicates
.groupBy(f -> f)
.map(GroupedFlux::key)
.subscribe(System.out::println);
Results in single subscription to each and will also work with dupes.
Or you could write your own intersect method
public <T> Flux<T> intersect(Flux<T> f1,Flux<T> f2){
return f1.join(f2,f ->Flux.never(),f-> Flux.never(),Tuples::of)
.filter(t -> t.getT1().equals(t.getT2()))
.map(Tuple2::getT1)
.groupBy(f -> f)
.map(GroupedFlux::key);
}
//Use on it's own
intersect(a,b).subscribe(System.out::println)
//Or with existing flux
a.transform(f -> intersect(a,f)).subscribe(System.out::println)
Upvotes: 1
Reputation: 72294
My favoured approach would be something like:
Flux.merge(a, b)
.groupBy(Function.identity())
.filterWhen(g -> g.count().map(l -> l>1))
.map(g -> g.key())
.subscribe(System.out::print); //Prints "BC"
(If a
or b
might contain duplicates, replace the first line with Flux.merge(a.distinct(), b.distinct())
.)
Each publisher is only played once, and it's trivial to expand it to more than two publishers if necessary.
Upvotes: 0