Reputation: 476
I have an API that returns a list of entities with a limit of 100 entities. If there are more entities, it returns a token for the next page.
I want to create a flux that returns all the entities (of all the pages) but only if needed (if requested).
I wrote this code:
class Page {
String token;
List<Object> entities;
}
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
});
}
And it works - almost
If I request 99 elements, first page is loaded and my flux contains the 99 elements.
If I request 150 elements, first page and second pages are loaded and my flux contains the 150 elements.
However, if I request 100 elements, the first and second pages are loaded (and my flux contains the 100 elements). My issue here is that the second page was loaded where I did not request a 101st element.
Current behavior:
subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100
Expected is: loading of page 2 happens after the last request(1)
It's almost like there is a prefetch some where, but I fail to see where. Any ideas?
Upvotes: 2
Views: 1182
Reputation: 476
Ok, I found it. There were no prefetch per-say. It was in fact the Flux.defer
that loads the next page on subscription, and not on request.
A quick (and dirty) test to fix that was:
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux
.fromIterable(page.entities)
.concatWith(
// Flux.defer(() -> load(page.token, fct))
Flux.create(s -> {
DelegateSubscriber[] ref = new DelegateSubscriber[1];
s.onRequest(l -> {
if (ref[0] == null) {
ref[0] = new DelegateSubscriber(s);
load(page.token, fct).subscribe(ref[0]);
}
ref[0].request(l);
});
}));
});
}
static class DelegateSubscriber extends BaseSubscriber<Object> {
FluxSink<Object> delegate;
public DelegateSubscriber(final FluxSink<Object> delegate) {
this.delegate = delegate;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// nothing
}
@Override
protected void hookOnNext(Object value) {
delegate.next(value);
}
@Override
protected void hookOnError(Throwable throwable) {
delegate.error(throwable);
}
}
Upvotes: 1