Reputation: 1333
I am finally learning functional style programming with Reactor. So I'm new to this.
First thing I want to do is to call external API with WebClient. This call needs to be recursive as response provides the next value of the calling parameter and I need to use it on the next call until trivial case is met.
Here's what came up with:
Flux.from(p -> queryUntilNow())
.flatMap(res -> // res is object )
.subscribe( // process )
private Flux<ApiResp> queryUntilNow() {
return Flux.from(p -> {
queryAPI(since)
.doOnError(System.out::println)
.subscribe(apiResp -> {
if (since == apiResp.last) return;
since = apiResp.last;
queryUntilNow();
});
});
}
private Flux<ApiResp> queryAPI(int last) {
Flux<ApiResp> resp = kapi.get()
.uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
.retrieve()
.bodyToFlux(ApiResp.class);
return resp;
}
Seems like I need to tune my thinking to this style of programming a bit more, so please give me some examples&explanations.
Thanks!
Upvotes: 0
Views: 1890
Reputation: 4536
If you just need to loop over linear results that are returned in batches (as opposed to recursing a tree), you can use a repeating flux whose starting point changes on each repeat.
Here's a full example that just simulates the api call. You can substitute in your WebClient call in queryFrom
:
public class Main {
private static class ApiResp {
private final int last;
private ApiResp(int last) {
this.last = last;
}
}
public static void main(String[] args) {
queryBetween(0, 15)
.doOnNext(apiResp -> System.out.println(apiResp.last))
.blockLast();
}
public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
// The starting point of the next iteration
final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
return Flux
// defer will cause a new Flux with a new starting point to be created for each subscription
.defer(() -> queryFrom(nextIterationStart.get()))
// update the starting point of the next iteration
.doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
// repeat with a new subscription if we haven't reached the end yet
.repeat(() -> nextIterationStart.get() < endExclusive)
// make sure we didn't go past the end if queryFrom returned more results than we need
.takeWhile(apiResp -> apiResp.last < endExclusive);
}
public static Flux<ApiResp> queryFrom(int start) {
// simulates an api call that always returns 10 results from the starting point
return Flux.range(start, 10)
.map(ApiResp::new);
}
}
Upvotes: 2