Reputation: 65
With SpringBoot 2 and with the Poi class (Point of Interest):
public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}
I have 2 Flux of Poi:
Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;
The first element availablePoisFlux contains Pois with:
The second element poiFlux contains Pois with:
(poidId is an identifier of a Poi).
I want to create a new Flux resultPoisFlux with Pois (with poidId, price, longitude and latitude) from two Flux (poiFlux and availablePoisFlux).
The poidId attribut is the key between the two Flux (poiFlux and availablePoisFlux).
Sample Implementation:
I think I can use the zipWith operator to do that, but I need some informations and advices with reactive operators (and filter ?)
I want to iterate from the first Flux and get informations (price) from the second flux using the poidId identifier and update price attribute with the correct value.
Sample Input Values:
poiFlux = Poi(poidId=poiId0, price=null, name=name0, latitude=2.2222, longitude=14.222)
poiFlux = Poi(poidId=poiId1, price=null, name=name1, latitude=3.2222, longitude=15.222)
poiFlux = Poi(poidId=poiId2, price=null, name=name2, latitude=4.2222, longitude=16.222)
poiFlux = Poi(poidId=poiId3, price=null, name=name3, latitude=5.2222, longitude=17.222)
poiFlux = Poi(poidId=poiId4, price=null, name=name4, latitude=6.2222, longitude=18.222)
poiFlux = Poi(poidId=poiId5, price=null, name=name5, latitude=7.2222, longitude=19.222)
poiFlux = Poi(poidId=poiId6, price=null, name=name6, latitude=8.2222, longitude=20.222)
poiFlux = Poi(poidId=poiId7, price=null, name=name7, latitude=9.2222, longitude=21.222)
poiFlux = Poi(poidId=poiId8, price=null, name=name8, latitude=10.2222, longitude=22.222)
poiFlux = Poi(poidId=poiId9, price=null, name=name9, latitude=11.2222, longitude=23.222)
availablePoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=null, longitude=null)
Expected Result:
resultPoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=2.2222, longitude=14.222)
resultPoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=3.2222, longitude=15.222)
resultPoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=4.2222, longitude=16.222)
resultPoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=5.2222, longitude=17.222)
resultPoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=6.2222, longitude=18.222)
Something like that:
Flux<Poi> resultPoisFlux = availablePoisFlux.zipWith(poiFlux, (a, b) -> new Poi(a.getPoidId(), a.getPrice(), getLatitudeFromPoiFluxByPoidId(a.getPoidId()), getLongitudeFromPoiFluxByPoidId(a.getPoidId())))....
Thanks for you help.
Upvotes: 5
Views: 10626
Reputation: 1934
First of all zip/zipWith will stop emiting one of the steams ends. So its not an ideal choice when we need to use two list with unequal number of elements. Flux.concatWith can be used in that scenario and if an id or any thing to group elements.
final Flux<Poi> available = availablePoisFlux;
final Flux<Poi> poiFlux = priceOnlyPoisFlux;
return availablePoisFlux.concatWith(priceOnlyPoisFlux) //Flux<Poi>
.groupBy(Poi::getPoiId) //Flux<GroupedFlux<String,Poi>>
.onBackpressureBuffer(Integer.MAX_VALUE) //if more groups groupBy hangs
.flatMap(groupedPois -> groupedPois.reduce((p1,p1)-> {
Poi pricePoi = Objects.nonNull(p1.getPrice()) ? p1 : p2;
Poi latitudePoi = = Objects.isNull(p1.getPrice()) ? p1 : p2;
return new Poi(pricePoi.getPoiId(),
pricePoi.getPrice(),
latitudePoi.getLatitude(),
latitudePoi.getLongitude());
} //Flux<Poi>
.filter(poi -> Objects.nonNull(poi.getPrice()) && Objects.nonNull(poi.getLongitude())); //Flux<Poi>
final filter call will make sure any poi's that comes as with out a pair will be skipped from the final result.
step by step below
Upvotes: 0
Reputation: 1827
When trying to combine two Flux-es into one, there is also great chance that your need will request creating processor class, which will subscribe to booth Flux-es, accumulate data in internal state (map, tree, set, ...) and publish single output flux. Something like this:
// source data
public final Sinks.Many<SimpleTreeInfoEvent> treeInfoEventSink = Sinks.many().multicast().onBackpressureBuffer();
public final Sinks.Many<TrainArrivalDataEvent> trainArrivalDataEventEventSink = Sinks.many().multicast().onBackpressureBuffer();
// processing class example:
public class TrainAndWeatherProcessor {
private final Map<String, String> internalState = new HashMap<>();
public final Sinks.Many<WetTrainInfoEvent> wetTrainInfoEventSink = Sinks.many().multicast().onBackpressureBuffer();
public TrainAndWeatherProcessor(
Sinks.Many<SimpleTreeInfoEvent> treeInfoEventSink,
Sinks.Many<TrainArrivalDataEvent> trainArrivalDataEventEventSink
) {
treeInfoEventSink.asFlux()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(treeInfo -> {
// some heavy processing according your business
final boolean somethingHasChanged = internalState.get("" + treeInfo.toString()).isEmpty();
if(somethingHasChanged) {
wetTrainInfoEventSink.tryEmitNext(new WetTrainInfoEvent());
}
});
trainArrivalDataEventEventSink.asFlux()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(treeInfo -> {
// some heavy processing according your business
final boolean somethingHasChanged = internalState.get("" + treeInfo.toString()).isEmpty();
if(somethingHasChanged) {
wetTrainInfoEventSink.tryEmitNext(new WetTrainInfoEvent());
}
});
}
record SimpleTreeInfoEvent(){}
record TrainArrivalDataEvent(){}
record WetTrainInfoEvent(){}
}
This example processing class will take two streams (SimpleTreeInfoEvent and TrainArrivalDataEvent instances coming in) and produce single stream of WetTrainInfoEvent instances.
Upvotes: 0
Reputation: 28301
zip/zipWith
, but it only combines two sources pair-wise......as long as it has enough elements to make pairs. So it is only useful in your case if you are guaranteed that the elements come in the same order in both source, and there are no discrepancies in poiIds
on each side. In your example that is the case because, even though second source only has 4 elements, these elements are the same as the beginning of the first source.
poiFlux.zipWith(availablePoisFlux, (a, b) -> new Poi(a.getPoiId(),
b.getPrice(),
a.getLatitude(),
a.getLongitude(),
a.getName()));
If there is no such guarantee, you need to somehow combine two disordered and disjoint sequences. You cannot do that without collecting all the element in one of the sources (preferably availablePoisFlux
), which means it will delay the processing of the other source until said source has completed.
One way of combining would be to collect all values into a map keyed by poiId
and then "iterate" over the second source. Since some elements might not be found in the map, you need handle
to be able to "skip" these:
availablePoisFlux.collectMap(Poi::getId, Poi::getPrice)
.flatMapMany(knownPrices -> poiFlux.handle((poi, sink) -> {
String poiId = poi.getPoiId();
if (knownPrices.containsKey(poiId) {
Double price = knownPrices.get(poiId);
Poi complete = new Poi(poiId, price, poi.getLatitude(),
poi.getLongitude(), poi.getName());
sink.next(complete);
} //else do nothing and let handle skip that poi
}));
Upvotes: 5