Kevin
Kevin

Reputation: 205

RXJS : Idiomatic way to create an observable stream from a paged interface

I have paged interface. Given a starting point a request will produce a list of results and a continuation indicator.

I've created an observable that is built by constructing and flat mapping an observable that reads the page. The result of this observable contains both the data for the page and a value to continue with. I pluck the data and flat map it to the subscriber. Producing a stream of values.

To handle the paging I've created a subject for the next page values. It's seeded with an initial value then each time I receive a response with a valid next page I push to the pages subject and trigger another read until such time as there is no more to read.

Is there a more idiomatic way of doing this?

    function records(start = 'LATEST', limit = 1000) {
      let pages = new rx.Subject();

      this.connect(start)
        .subscribe(page => pages.onNext(page));

      let records = pages
        .flatMap(page => {
          return this.read(page, limit)
            .doOnNext(result => {
              let next = result.next;
              if (next === undefined) {
                pages.onCompleted();
              } else {
                pages.onNext(next);
              }
            });
        })
        .pluck('data')
        .flatMap(data => data);

      return records;
    }

Upvotes: 3

Views: 1089

Answers (1)

Brandon
Brandon

Reputation: 39212

That's a reasonable way to do it. It has a couple of potential flaws in it (that may or may not impact you depending upon your use case):

  1. You provide no way to observe any errors that occur in this.connect(start)
  2. Your observable is effectively hot. If the caller does not immediately subscribe to the observable (perhaps they store it and subscribe later), then they'll miss the completion of this.connect(start) and the observable will appear to never produce anything.
  3. You provide no way to unsubscribe from the initial connect call if the caller changes its mind and unsubscribes early. Not a real big deal, but usually when one constructs an observable, one should try to chain the disposables together so it call cleans up properly if the caller unsubscribes.

Here's a modified version:

  1. It passes errors from this.connect to the observer.
  2. It uses Observable.create to create a cold observable that only starts is business when the caller actually subscribes so there is no chance of missing the initial page value and stalling the stream.
  3. It combines the this.connect subscription disposable with the overall subscription disposable

Code:

    function records(start = 'LATEST', limit = 1000) {
        return Rx.Observable.create(observer => {
            let pages = new Rx.Subject();
            let connectSub = new Rx.SingleAssignmentDisposable();
            let resultsSub = new Rx.SingleAssignmentDisposable();
            let sub = new Rx.CompositeDisposable(connectSub, resultsSub);

            // Make sure we subscribe to pages before we issue this.connect()
            // just in case this.connect() finishes synchronously (possible if it caches values or something?)
            let results = pages
                .flatMap(page => this.read(page, limit))
                .doOnNext(r => this.next !== undefined ? pages.onNext(this.next) : pages.onCompleted())
                .flatMap(r => r.data);
            resultsSub.setDisposable(results.subscribe(observer));

            // now query the first page
            connectSub.setDisposable(this.connect(start)
                .subscribe(p => pages.onNext(p), e => observer.onError(e)));

            return sub;
        });
    }

Note: I've not used the ES6 syntax before, so hopefully I didn't mess anything up here.

Upvotes: 3

Related Questions