TheUnreal
TheUnreal

Reputation: 24492

Firebase lazy loading with rxjs

I have the following code, to get the last chast messages:

const pageSize = 10;
this.notifier = new Subject<any>();
let last: Observable<any>;

let infiniteList = Observable

  // Use zip to combine the notifier's emissions with the last
  // child value:

  .zip(this.notifier, Observable.defer(() => last))

  // Use concatMap to emit a page of children into the
  // composed observable (note that first is used to complete
  // the inner list):

  .concatMap(([unused, last]) => this.af.database.list("chat_messages", {
      query: {

        // If there is a last value, start at that value but ask
        // for one more:

        limitToLast: last ? (pageSize + 1) : pageSize
      }
    })
    .first()
  )

  // Use scan to accumulate the page into the infinite list:

  .scan((acc, list) => {

    // If this isn't the initial page, the page was started
    // at the last value, so remove it from the beginning of
    // the list:

    if (acc.length > 0) {
      list.shift();
    }
    return acc.concat(list);
  }, [])

  // Use share so that the last observable (see below) doesn't
  // result in a second subscription:

  .share();

// Each time a page is emitted, map to its last child value so
// that it can be fed back into the composed infinite list:

last = infiniteList
  .map((list) => {
    list.reverse();
    if (list.length === 0) {
      return null;
    }
    return list[list.length - 1].date_published;
  })
  .startWith(null);

infiniteList.subscribe((list) => {
  this.chatMessages = list;
});
this.notifier.next();

Whenever the user scrolls to the bottom of the list I do this.notifier.next(); to read more from the chat history. 10 messages at a time.

The problem: the first 10 works great, but when I scroll to the bottom to read new data, I get the same information in the opposite order. For example, I had this:

1- Good

2- Fine, you?

10- Hi alL! How are you?

---scroll to the previous 10 message--

12-. Hi all! How are you?

13-. Fine, you?

14-. Good

Upvotes: 1

Views: 585

Answers (1)

cartant
cartant

Reputation: 58420

The following implementation should work just fine.

Note that the composed last observable uses filter to stop emitting values when it reaches the end (or the beginning, depending on your point of view) and the last key is the first element in the page.

Also, the page needs to be reversed - as Firebase always uses an ascending order - and the reversal should occur in the composed pages observable and the page needs to be copied as the reversal occurs in place.

import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import rxjs/add/observable/defer";
import rxjs/add/observable/zip";
import rxjs/add/operator/concatMap";
import rxjs/add/operator/filter";
import rxjs/add/operator/first";
import rxjs/add/operator/map";
import rxjs/add/operator/scan";
import rxjs/add/operator/share";
import rxjs/add/operator/startWith";

const pageSize = 100;
let notifier = new Subject<any>();
let last: Observable<any>;

let pages = Observable

  // Use zip to combine the notifier's emissions with the last
  // child value:

  .zip(notifier, Observable.defer(() => last))

  // Use concatMap to emit a page of children into the
  // composed observable (note that first is used to complete
  // the inner list):

  .concatMap(([unused, last], index) => this.af.database
    .list("chat_messages", {
      query: {
        endAt: last,
        limitToLast: (index === 0) ? pageSize : (pageSize + 1)
      }
    })
    .first()
  )

  // Use share so that the last observable (see below) doesn't
  // result in a second subscription:

  .share();

// Each time a page is emitted, map to its last child value so
// that it can be fed back into the composed infinite list:

last = pages
  .filter((page) => page.length > 0)
  .map((page) => page[0].$key)
  .startWith(undefined);

// Use scan to accumulate the pages into the infinite list. Copy
// the page - using slice - before reversing it:

let infiniteList = pages
  .scan((list, page) => {
    page = page.slice();
    page.reverse();
    if (list.length > 0) {
      page.shift();
    }
    return list.concat(page);
  }, []);

infiniteList.subscribe(observer);

// Each time the notifier emits, another page will be retrieved
// and added to the infinite list:

notifier.next();
notifier.next();
notifier.next();

This will only work reliably when using orderByKey. The Firebase SDK's startAt method supports an optional key parameter when ordering by child, value or priority. However, the endAt method is documented as only supporting key when ordering by priority. That means that paging using endAt and orderByChild will not be reliable - if the child values are not unique and there are multiple items with the same child value, paging will not always be possible.

Regarding realtime updates for the infinite list, that is more complicated, as the limit-based mechanism can easily see duplicate and missing items effected. However, it is reasonably simple to include realtime additions at the head of the list:

let notifier = new Subject<any>();
let last: Observable<any>;

let pages = Observable
  .zip(notifier, Observable.defer(() => last))
  .concatMap(([unused, last], index) => this.af.database
    .list("chat_messages", {
      query: {
        endAt: last,
        limitToLast: (index === 0) ? pageSize : (pageSize + 1)
      }
    })
    .first()
  )
  .share();

last = pages
  .filter((page) => page.length > 0)
  .map((page) => page[0].$key)
  .startWith(undefined);

let tail = pages
  .scan((list, page) => {
    page = page.slice();
    page.reverse();
    if (list.length > 0) {
      page.shift();
    }
    return list.concat(page);
  }, []);

// When the first non-empty page is emitted, create an observable
// that watches items starting at the first item in the first page:

let head = pages
  .filter((page) => page.length > 0)
  .take(1)
  .switchMap((page) => this.af.database
    .list("chat_messages", {
      query: {
        startAt: page[page.length - 1].$key
      }
    })
  )
  .map((list) => list.slice(1));

// And combine that with the pages:

let infiniteList = Observable
  .combineLatest(
    head,
    tail,
    (head, tail) => head.concat(tail)
  );

infiniteList.subscribe(observer);

notifier.next();
notifier.next();
notifier.next();

Upvotes: 3

Related Questions