greengold
greengold

Reputation: 1333

recursive API call with WebClient and Reactor 3.0

I am finally learning functional style programming with Reactor. So I'm new to this.

First thing I want to do is to call external API with WebClient. This call needs to be recursive as response provides the next value of the calling parameter and I need to use it on the next call until trivial case is met.

Here's what came up with:

    Flux.from(p -> queryUntilNow())
            .flatMap(res -> // res is object )
            .subscribe( // process )



private Flux<ApiResp> queryUntilNow() {
    return Flux.from(p -> {
        queryAPI(since)
                .doOnError(System.out::println)
                .subscribe(apiResp -> {
                    if (since == apiResp.last) return;

                    since = apiResp.last;
                    queryUntilNow();
                });
    });
}

private Flux<ApiResp> queryAPI(int last) {
    Flux<ApiResp> resp = kapi.get()
            .uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
            .retrieve()
            .bodyToFlux(ApiResp.class);

    return resp;
}

Seems like I need to tune my thinking to this style of programming a bit more, so please give me some examples&explanations.

Thanks!

Upvotes: 0

Views: 1890

Answers (1)

Phil Clay
Phil Clay

Reputation: 4536

If you just need to loop over linear results that are returned in batches (as opposed to recursing a tree), you can use a repeating flux whose starting point changes on each repeat.

Here's a full example that just simulates the api call. You can substitute in your WebClient call in queryFrom:

public class Main {

    private static class ApiResp {
        private final int last;
        private ApiResp(int last) {
            this.last = last;
        }
    }

    public static void main(String[] args) {
        queryBetween(0, 15)
                .doOnNext(apiResp -> System.out.println(apiResp.last))
                .blockLast();
    }

    public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
        // The starting point of the next iteration
        final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
        return Flux
                // defer will cause a new Flux with a new starting point to be created for each subscription
                .defer(() -> queryFrom(nextIterationStart.get()))
                // update the starting point of the next iteration
                .doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
                // repeat with a new subscription if we haven't reached the end yet
                .repeat(() -> nextIterationStart.get() < endExclusive)
                // make sure we didn't go past the end if queryFrom returned more results than we need
                .takeWhile(apiResp -> apiResp.last < endExclusive);
    }

    public static Flux<ApiResp> queryFrom(int start) {
        // simulates an api call that always returns 10 results from the starting point
        return Flux.range(start, 10)
                .map(ApiResp::new);
    }
}

Upvotes: 2

Related Questions