VoY
VoY

Reputation: 5699

How to handle open ended pagination using reactive streams?

I am using RxJS for all async request handling in my redux application. One thing in particular I am using it for is enumerating all the results from a paginated AWS API. Those APIs generally do not allow random page jumping, one must call the API with a special token (nextToken) from the previous call and follow the sequence. The list is complete when there is no nextToken sent with the response.

What I would like to do is have a stream of the partial page results that I can at the end flatten into one array. The expected benefit is that when the user leaves a particular screen in the app that fetched the list, I can unsubscribe them from the stream and the remaining pages (which will never get displayed) won't be fetched.

I am struggling to understand how I can create such a stream relatively "purely" without going into the territory of Rx.Subject and pushing the tokens into it manually. I fear that if I do it "manually" like this, I will be exposed to potential memory leaks or other bugs resulting from failure to unsubscribe and other coding errors.

Upvotes: 1

Views: 698

Answers (1)

cartant
cartant

Reputation: 58420

You can use the expand operator to implement paging. And you can control the retrieval of the pages using a notifier observable. The take and concatMap operators can be used to ensure the next page isn't retrieved until the notifier emits.

The notifier can be whatever observable is suitable. For example, you could use a Subject and call next on it when the user scrolls to the bottom of the list.

This snippet pages through starred repos using the GitHub API - which uses the Link header to indicate the URI for the next page. Conceptually, this is the same mechanism as with the AWS API (but I can run it in a snippet).

The comments should explain how it works.

// The GitHub API uses the Link header for paging.
// Map the response to an object that contains the content and the
// URI for the next page (if there is one).

function get(uri) {
  return Rx.Observable.ajax
    .get(uri)
    .map(response => ({
      content: response.response,
      next: next(response)
    }));
}

function next(response) {
    const link = response.xhr.getResponseHeader("Link");
    if (link) {
        const match = link.match(/<([^>]+)>;\s*rel="next"/);
        if (match) {
            return match[1];
        }
    }
    return null;
}

// Use a notifier observable to control the paging.
// Here, the notifier is a timer, but it could be a subject that's
// triggered when the used scrolls to the bottom of the list, etc.

const notifier = Rx.Observable.defer(() => {
  console.log("waiting...");
  return Rx.Observable
    .timer(1000)
    .do(() => console.log("notifying..."));
});

// Use the expand operator to retrieve the next page, if there is
// one. The concatMap operator will ensure the next page isn't retrieved
// until the notifier next emits a value (which is ignored).
// The mergeMap operator will flatten the repos in the content array
// into the observable stream - so the observable emits repos and not
// arrays of repos.

const starred = get("https://api.github.com/users/voy/starred")
  .expand(({ next }) => next ?
    notifier.take(1).concatMap(() => get(next)) :
    Rx.Observable.empty()
  )
  .mergeMap(({ content }) => content);

starred.subscribe(
  repo => console.log(`  ${repo.name}`),
  undefined,
  () => console.log("finished paging")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Upvotes: 1

Related Questions