Paplusc
Paplusc

Reputation: 1130

How to manipulate a list attribute inside Mono as a flux?

I am new to project reactor and I am trying to manipulate some fields in a list as Flux inside a Mono.

So I have Mono<Basket> with an attribute called lines which is a List<Line>. Then, for every line I have to call two external services to get some additional info.

Here is my code:

Mono<Basket> basketMono =  //this doesn't work cause I map it to a Flux
        Mono.just(basket)
            .flatMapIterable(Basket::getLines)
            .parallel(...)
            .runOn(...)
            .map((line) -> {
                    line.setInfo1(externalService1.getInfo());
                    line.setInfo2(externalService2.getInfo());
                    return line;
             });

My main problem here is that I don't know how to set this additional info to the lines and keep the original object so the method that holds this code can return the Mono<Basket> with all the additional info setted.

I am struggling with this. Is this approach right? Some help would be more than appreciate.

Upvotes: 0

Views: 1641

Answers (1)

zlaval
zlaval

Reputation: 2047

A simple solution is you dont flatmap the lines, because it will create a new publisher with n element (and type line). Within flatmap, you can start another publisher, which goes to services, set data and then you can return the original object.

Mono<Basket> basketMono = Mono.just(basket)
        .flatMap(b ->
                Flux.fromIterable(b.items)
                        .flatMap(this::callService1)
                        .flatMap(this::callService2)
                        .then(Mono.just(b))
        );

I suppose your external service calls are reactive, something like this:

 Mono<Item> callService1(Item item) {
        return mockService1().zipWith(Mono.just(item))
                .map(it -> {
                    var result = it.getT2();
                    result.setInfo1(it.getT1());
                    return result;
                });
 }

 Mono<String> mockService1() {
    return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
 }

Note that flatMap will automatically subscribe to the inner publisher.

Also I've created a simple example, you can test:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class Example {

    class Item {
        String info1;
        String info2;

        public void setInfo1(String info1) {
            this.info1 = info1;
        }

        public void setInfo2(String info2) {
            this.info2 = info2;
        }

        public String getInfo1() {
            return info1;
        }

        public String getInfo2() {
            return info2;
        }
    }

    class Basket {
        String user;
        List<Item> items;

        public Basket(String user, List<Item> items) {
            this.user = user;
            this.items = items;
        }
    }

    Mono<String> mockService1() {
        return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
    }

    Mono<String> mockService2() {
        return Mono.just("some other data " + ThreadLocalRandom.current().nextInt(1000)).delayElement(Duration.ofMillis(100));
    }

    Mono<Item> callService1(Item item) {
        return mockService1().zipWith(Mono.just(item))
                .map(it -> {
                    var result = it.getT2();
                    result.setInfo1(it.getT1());
                    return result;
                });
    }

    Mono<Item> callService2(Item item) {
        return mockService2().zipWith(Mono.just(item))
                .map(it -> {
                    var result = it.getT2();
                    result.setInfo1(it.getT1());
                    return result;
                });
    }


    @Test
    public void testBasket() {
        var basket = new Basket("first", List.of(new Item(), new Item(), new Item()));
        Mono<Basket> basketMono = Mono.just(basket)
                .flatMap(b ->
                        Flux.fromIterable(b.items)
                                .flatMap(this::callService1)
                                .flatMap(this::callService2)
                                .then(Mono.just(b))
                );


        StepVerifier.create(basketMono)
                .expectNextMatches(b -> b.items.get(0).info1 != null)
                .verifyComplete();

    }


}

Upvotes: 3

Related Questions