matthieusb
matthieusb

Reputation: 505

Merging several Flux into one from Mongo Collection

I am a bit new to spring reactive programming. I am currently developing an app using spring-boot-starter-webflux:2.0.0.M5.

I have a Foodtruck mongodb model which contains menus :

@Document(collection = "Foodtruck")
@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public class Foodtruck {
  @Id
  private String id;

  @URL
  private String siteUrl;

  @NotBlank
  private String name;

  private String description;

  @NotBlank
  private String address;

  private List<Menu> menus;

  @JsonIgnore
  private GridFS image;
}

And here is the model for Menu :

@lombok.Getter
@lombok.Setter
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
@Document(collection = "menus")
public class Menu {

  @Id
  private String id;

  private List<DayOfWeek> days;

  @NotBlank
  private String label;

  private String description;

  private Double price;

  private List<Dish> dishes;

  @JsonIgnore
  private GridFS image;
}

To get all my menus, I first need to get all my restaurants and then merge all the Flux obtained, and return them through my rest controller.

Here is the service I call from my rest resource :

@Service
public class MenuServiceImpl implements MenuService {

  FoodtruckService foodtruckService;

  public MenuServiceImpl(FoodtruckService foodtruckService) {
    this.foodtruckService = foodtruckService;
  }

  @Override
  public Flux<Menu> getAllMenus() {
    Flux<Menu> allMenuFlux = Flux.empty();
    Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();

    foodtruckFlux.toStream().forEach(foodtruck -> {
      Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
      allMenuFlux.mergeWith(currentMenuFlux);
    });

    return allMenuFlux;
  }
}

The mergeWith does not seems to add anything to allMenuFlux. I think I have an understanding problem here.

I have been through the documentation, tested other methods like concat or zip, but it does not interleave flux events as I wish. I can't manage to merge these Flux correctly. I also think there is a better way to get mongo embedded documents through a menu repository, since my method seems to be a bit overkill, and can cause performance problems in the long run. I have tried and it does not work.

EDIT: After trying the following code (Being sure that my list is not empty), the resulting fluxtest3 variable is merged correctly :

Flux<Menu> allMenuFlux = Flux.empty();
Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();

List<Foodtruck> test = foodtruckFlux.collectList().block();

Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);

But that's not exactly what I want. Why isn't it working with an empty flux as parent flux.

What am I missing here ?

Thanks in advance for your help.

Upvotes: 0

Views: 1914

Answers (1)

Jens Schauder
Jens Schauder

Reputation: 81907

I think there are a couple of misunderstandings at work here.

  1. If you ever convert a Flux to a Collection, Stream or similar and then something obtained from that back to a Flux you are most certainly doing something wrong. These classes force your pipeline to collect multiple elements, then process them and then convert them back to a Flux. In almost every case this should be possible just with operations offered by Flux.

  2. The methods on Flux don't change the Flux but create new instances with additional behavior. So if you have code like this:

    Flux<Menu> allMenuFlux = Flux.empty();
    Flux<Foodtruck> foodtruckFlux = foodtruckService.getAllFoodtrucks();
    
    foodtruckFlux.toStream().forEach(foodtruck -> {
        Flux<Menu> currentMenuFlux = Flux.fromIterable(foodtruck.getMenus());
        allMenuFlux.mergeWith(currentMenuFlux);
    });
    

    return allMenuFlux;

    Everytime you call allMenuFlux.mergeWith(currentMenuFlux); you create a new Flux just so the Garbage Collector can take care of it. allMenuFlux is still the empty `Flux you started with.

  3. What you really seem to want is:

    return foodtruckService.getAllFoodtrucks()
        .flatMap(Foodtruck::getMenus);
    

    See the documentation for flatMap. The difference between flatMap and mergeWith is that mergeWith keeps the original elements. Which is superfluous if there are none as in your use-cases.

Bonus: Your additional question

Flux<Menu> fluxtest1 = Flux.fromIterable(test.get(0).getMenus());
Flux<Menu> fluxtest2 = Flux.fromIterable(test.get(1).getMenus());
Flux<Menu> fluxtest3 = fluxtest1.mergeWith(fluxtest2);

Here you are not returning the original fluxtest1 but the newly generated fluxtest2. Therefore it does work.

Upvotes: 5

Related Questions