Reputation: 1130
I am new to project reactor and I am trying to manipulate some fields in a list
as Flux
inside a Mono
.
So I have Mono<Basket>
with an attribute called lines
which is a List<Line>
.
Then, for every line I have to call two external services to get some additional info.
Here is my code:
Mono<Basket> basketMono = //this doesn't work cause I map it to a Flux
Mono.just(basket)
.flatMapIterable(Basket::getLines)
.parallel(...)
.runOn(...)
.map((line) -> {
line.setInfo1(externalService1.getInfo());
line.setInfo2(externalService2.getInfo());
return line;
});
My main problem here is that I don't know how to set this additional info to the lines and keep the original object so the method that holds this code can return the Mono<Basket>
with all the additional info setted.
I am struggling with this. Is this approach right? Some help would be more than appreciate.
Upvotes: 0
Views: 1641
Reputation: 2047
A simple solution is you dont flatmap the lines, because it will create a new publisher with n element (and type line). Within flatmap, you can start another publisher, which goes to services, set data and then you can return the original object.
Mono<Basket> basketMono = Mono.just(basket)
.flatMap(b ->
Flux.fromIterable(b.items)
.flatMap(this::callService1)
.flatMap(this::callService2)
.then(Mono.just(b))
);
I suppose your external service calls are reactive, something like this:
Mono<Item> callService1(Item item) {
return mockService1().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
Mono<String> mockService1() {
return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
}
Note that flatMap will automatically subscribe to the inner publisher.
Also I've created a simple example, you can test:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class Example {
class Item {
String info1;
String info2;
public void setInfo1(String info1) {
this.info1 = info1;
}
public void setInfo2(String info2) {
this.info2 = info2;
}
public String getInfo1() {
return info1;
}
public String getInfo2() {
return info2;
}
}
class Basket {
String user;
List<Item> items;
public Basket(String user, List<Item> items) {
this.user = user;
this.items = items;
}
}
Mono<String> mockService1() {
return Mono.just("some data " + ThreadLocalRandom.current().nextInt(100)).delayElement(Duration.ofMillis(100));
}
Mono<String> mockService2() {
return Mono.just("some other data " + ThreadLocalRandom.current().nextInt(1000)).delayElement(Duration.ofMillis(100));
}
Mono<Item> callService1(Item item) {
return mockService1().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
Mono<Item> callService2(Item item) {
return mockService2().zipWith(Mono.just(item))
.map(it -> {
var result = it.getT2();
result.setInfo1(it.getT1());
return result;
});
}
@Test
public void testBasket() {
var basket = new Basket("first", List.of(new Item(), new Item(), new Item()));
Mono<Basket> basketMono = Mono.just(basket)
.flatMap(b ->
Flux.fromIterable(b.items)
.flatMap(this::callService1)
.flatMap(this::callService2)
.then(Mono.just(b))
);
StepVerifier.create(basketMono)
.expectNextMatches(b -> b.items.get(0).info1 != null)
.verifyComplete();
}
}
Upvotes: 3