OlivierM
OlivierM

Reputation: 65

Combine two Flux instances of different types

With SpringBoot 2 and with the Poi class (Point of Interest):

public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}

I have 2 Flux of Poi:

Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;

The first element availablePoisFlux contains Pois with:

The second element poiFlux contains Pois with:

(poidId is an identifier of a Poi).

I want to create a new Flux resultPoisFlux with Pois (with poidId, price, longitude and latitude) from two Flux (poiFlux and availablePoisFlux).

The poidId attribut is the key between the two Flux (poiFlux and availablePoisFlux).

Sample Implementation:

I think I can use the zipWith operator to do that, but I need some informations and advices with reactive operators (and filter ?)

I want to iterate from the first Flux and get informations (price) from the second flux using the poidId identifier and update price attribute with the correct value.

Sample Input Values:

poiFlux = Poi(poidId=poiId0, price=null, name=name0, latitude=2.2222, longitude=14.222)
poiFlux = Poi(poidId=poiId1, price=null, name=name1, latitude=3.2222, longitude=15.222)
poiFlux = Poi(poidId=poiId2, price=null, name=name2, latitude=4.2222, longitude=16.222)
poiFlux = Poi(poidId=poiId3, price=null, name=name3, latitude=5.2222, longitude=17.222)
poiFlux = Poi(poidId=poiId4, price=null, name=name4, latitude=6.2222, longitude=18.222)
poiFlux = Poi(poidId=poiId5, price=null, name=name5, latitude=7.2222, longitude=19.222)
poiFlux = Poi(poidId=poiId6, price=null, name=name6, latitude=8.2222, longitude=20.222)
poiFlux = Poi(poidId=poiId7, price=null, name=name7, latitude=9.2222, longitude=21.222)
poiFlux = Poi(poidId=poiId8, price=null, name=name8, latitude=10.2222, longitude=22.222)
poiFlux = Poi(poidId=poiId9, price=null, name=name9,  latitude=11.2222, longitude=23.222)

availablePoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=null, longitude=null)

Expected Result:

resultPoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=2.2222, longitude=14.222)
resultPoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=3.2222, longitude=15.222)
resultPoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=4.2222, longitude=16.222)
resultPoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=5.2222, longitude=17.222)
resultPoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=6.2222, longitude=18.222)

Something like that:

Flux<Poi> resultPoisFlux = availablePoisFlux.zipWith(poiFlux, (a, b) -> new Poi(a.getPoidId(), a.getPrice(), getLatitudeFromPoiFluxByPoidId(a.getPoidId()), getLongitudeFromPoiFluxByPoidId(a.getPoidId())))....

Thanks for you help.

Upvotes: 5

Views: 10626

Answers (3)

Arundev
Arundev

Reputation: 1934

First of all zip/zipWith will stop emiting one of the steams ends. So its not an ideal choice when we need to use two list with unequal number of elements. Flux.concatWith can be used in that scenario and if an id or any thing to group elements.

final Flux<Poi> available = availablePoisFlux;
final Flux<Poi> poiFlux = priceOnlyPoisFlux;

 return availablePoisFlux.concatWith(priceOnlyPoisFlux) //Flux<Poi>
.groupBy(Poi::getPoiId) //Flux<GroupedFlux<String,Poi>>
.onBackpressureBuffer(Integer.MAX_VALUE) //if more groups groupBy hangs
.flatMap(groupedPois -> groupedPois.reduce((p1,p1)-> {
   Poi pricePoi = Objects.nonNull(p1.getPrice()) ? p1 : p2;
   Poi latitudePoi =  = Objects.isNull(p1.getPrice()) ? p1 : p2;
   return new Poi(pricePoi.getPoiId(),
            pricePoi.getPrice(),
            latitudePoi.getLatitude(),
            latitudePoi.getLongitude());

} //Flux<Poi>
.filter(poi -> Objects.nonNull(poi.getPrice()) && Objects.nonNull(poi.getLongitude())); //Flux<Poi>

final filter call will make sure any poi's that comes as with out a pair will be skipped from the final result.

step by step below

  1. Combine two flux
  2. Group them with poiId
  3. flatmap -> only a pair exist this will come to flatMap and operation loic mentioned inside is executed. here its reduce a stream
  4. filter -> if a pair exist, then it will go in step 4 and if a pair not exist directly it will skip step 3 and available in step 4 irrespective of object received, we are checking the price and latitude fields both of them as are present or not. if present it will availbale at the end.

Upvotes: 0

Lubo
Lubo

Reputation: 1827

When trying to combine two Flux-es into one, there is also great chance that your need will request creating processor class, which will subscribe to booth Flux-es, accumulate data in internal state (map, tree, set, ...) and publish single output flux. Something like this:

// source data
public final Sinks.Many<SimpleTreeInfoEvent> treeInfoEventSink = Sinks.many().multicast().onBackpressureBuffer();
public final Sinks.Many<TrainArrivalDataEvent> trainArrivalDataEventEventSink = Sinks.many().multicast().onBackpressureBuffer();

// processing class example:
public class TrainAndWeatherProcessor {
    private final Map<String, String> internalState = new HashMap<>();

    public final Sinks.Many<WetTrainInfoEvent> wetTrainInfoEventSink = Sinks.many().multicast().onBackpressureBuffer();
    public TrainAndWeatherProcessor(
            Sinks.Many<SimpleTreeInfoEvent> treeInfoEventSink,
            Sinks.Many<TrainArrivalDataEvent> trainArrivalDataEventEventSink
    ) {
        treeInfoEventSink.asFlux()
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(treeInfo -> {
                    // some heavy processing according your business
                    final boolean somethingHasChanged = internalState.get("" + treeInfo.toString()).isEmpty();
                    if(somethingHasChanged) {
                        wetTrainInfoEventSink.tryEmitNext(new WetTrainInfoEvent());
                    }
                });

        trainArrivalDataEventEventSink.asFlux()
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(treeInfo -> {
                    // some heavy processing according your business
                    final boolean somethingHasChanged = internalState.get("" + treeInfo.toString()).isEmpty();
                    if(somethingHasChanged) {
                        wetTrainInfoEventSink.tryEmitNext(new WetTrainInfoEvent());
                    }
                });
    }

    record SimpleTreeInfoEvent(){}
    record TrainArrivalDataEvent(){}
    record WetTrainInfoEvent(){}
}

This example processing class will take two streams (SimpleTreeInfoEvent and TrainArrivalDataEvent instances coming in) and produce single stream of WetTrainInfoEvent instances.

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

zip/zipWith, but it only combines two sources pair-wise...

...as long as it has enough elements to make pairs. So it is only useful in your case if you are guaranteed that the elements come in the same order in both source, and there are no discrepancies in poiIds on each side. In your example that is the case because, even though second source only has 4 elements, these elements are the same as the beginning of the first source.

poiFlux.zipWith(availablePoisFlux, (a, b) -> new Poi(a.getPoiId(), 
    b.getPrice(),
    a.getLatitude(),
    a.getLongitude(),
    a.getName()));

More generic solution, less reactive one

If there is no such guarantee, you need to somehow combine two disordered and disjoint sequences. You cannot do that without collecting all the element in one of the sources (preferably availablePoisFlux), which means it will delay the processing of the other source until said source has completed.

One way of combining would be to collect all values into a map keyed by poiId and then "iterate" over the second source. Since some elements might not be found in the map, you need handle to be able to "skip" these:

availablePoisFlux.collectMap(Poi::getId, Poi::getPrice)
    .flatMapMany(knownPrices -> poiFlux.handle((poi, sink) -> {
        String poiId = poi.getPoiId();
        if (knownPrices.containsKey(poiId) {
            Double price = knownPrices.get(poiId);
            Poi complete = new Poi(poiId, price, poi.getLatitude(),
                poi.getLongitude(), poi.getName());
            sink.next(complete);
        } //else do nothing and let handle skip that poi
    }));

Upvotes: 5

Related Questions