Tiller
Tiller

Reputation: 476

Request next in Reactor's Flux only when needed

I have an API that returns a list of entities with a limit of 100 entities. If there are more entities, it returns a token for the next page.

I want to create a flux that returns all the entities (of all the pages) but only if needed (if requested).

I wrote this code:

class Page {
    String token;
    List<Object> entities;
}

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
    });
}

And it works - almost

If I request 99 elements, first page is loaded and my flux contains the 99 elements.

If I request 150 elements, first page and second pages are loaded and my flux contains the 150 elements.

However, if I request 100 elements, the first and second pages are loaded (and my flux contains the 100 elements). My issue here is that the second page was loaded where I did not request a 101st element.

Current behavior:

subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100

Expected is: loading of page 2 happens after the last request(1)

It's almost like there is a prefetch some where, but I fail to see where. Any ideas?

Upvotes: 2

Views: 1182

Answers (1)

Tiller
Tiller

Reputation: 476

Ok, I found it. There were no prefetch per-say. It was in fact the Flux.defer that loads the next page on subscription, and not on request.

A quick (and dirty) test to fix that was:

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux
                .fromIterable(page.entities)
                .concatWith(
                        // Flux.defer(() -> load(page.token, fct))
                        Flux.create(s -> {
                            DelegateSubscriber[] ref = new DelegateSubscriber[1];

                            s.onRequest(l -> {
                                if (ref[0] == null) {
                                    ref[0] = new DelegateSubscriber(s);
                                    load(page.token, fct).subscribe(ref[0]);
                                }
                                ref[0].request(l);
                            });
                        }));
    });
}

static class DelegateSubscriber extends BaseSubscriber<Object> {

    FluxSink<Object> delegate;

    public DelegateSubscriber(final FluxSink<Object> delegate) {
        this.delegate = delegate;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // nothing
    }

    @Override
    protected void hookOnNext(Object value) {
        delegate.next(value);
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        delegate.error(throwable);
    }
}

Upvotes: 1

Related Questions