dierre
dierre

Reputation: 7220

How to concatWith using information from previous Observable for pagination

Let's say I have a blocking method with is called List<UUID> listOf(int page). If I want to paginate something like this, one idea is to do something like this:

public Observable<UUID> allOf(int initialPage) {
    return fromCallable( () -> listOf(initialPage))
            .concatWith( fromCallable( () -> allOf(initialPage + 1)))
            .flatMap(x -> from(x));
}

If my service doesn't use the page number but the last element of the list to find next elements, how can I achieve it with RxJava?

I would still like to obtain the effect of doing something like allOf(0).take(20) and obtain, with concatWith, the call to the second Observable when the first one has completed.

But how can I do it when I need information from the previous call?

Upvotes: 1

Views: 663

Answers (1)

akarnokd
akarnokd

Reputation: 70007

You could use a subject to send back the next page number to the beginning of a sequence:

List<Integer> service(int index) {
    System.out.println("Reading " + index);
    List<Integer> list = new ArrayList<>();
    for (int i = index; i < index + 20; i++) {
        list.add(i);
    }
    return list;
}

Flowable<List<Integer>> getPage(int index) {
    FlowableProcessor<Integer> pager = UnicastProcessor.<Integer>create()
        .toSerialized();
    pager.onNext(index);

    return pager.observeOn(Schedulers.trampoline(), true, 1)
    .map(v -> {
        List<Integer> list = service(v);
        pager.onNext(list.get(list.size() - 1) + 1);
        return list;
    })
    ;
}

@Test
public void testPager() {
    getPage(0).take(20)
    .subscribe(System.out::println, Throwable::printStackTrace);
}

Upvotes: 1

Related Questions