RxJS load and cache first N values when notified and then emit values one by one on next notifications

I'm building an angular app where I'm trying to implement as much as possible in reactive manner using RxJS.

I'm implementing something like a carousel - where next item is shown on user action. Items are received from BE one at a time. I'm trying to make user experience smooth. So, I want to preload first N items before showing anything to the user.

After N items are loaded - I need to show the first loaded one. When user clicks next - I emit the next preloaded item and trigger preload of the next item to make sure the number of preloaded items is always N.

Kinda buffer - not to make user wait on each step. So, next items are preloading while user is still viewing previous ones.

I was thinking to use something like bufferCount(N) on items loading, then spread the array using map((ar) => from(ar)). And with another notifier Subject using zip to trigger emissions from that buffer.. But it doesn't seem to work very well. Looks like every N emissions I have some glitch where I first see one item and then quickly another one.

Not sure how to implement it better. This seems like it should be a common use case.

----Edit----

Items are loaded via http, right. Here's some code that I have atm (not really functional - just concept):

//..........
// loadRndItem loads data using httpClient
this.loadingBuffer$ =  this.nextItemSubj.pipe(
      // here should somehow trigger first N loading processes
      flatMap(() => this.loadRndItem()),
      bufferCount(this.bufferSize),
      flatMap((ar) => {
        return from(ar);
      }),
      share(),
      takeUntil(this.endSubj)
    );

this.currentItem$ = zip(
    this.loadingBuffer$,
    this.nextItemSubj
  ).pipe(
    map(([val, _]) => val),
    share(),
    takeUntil(this.gameEndSubj)
  );

  //..........
function nextItem(): void {
  this.nextItemSubj.next();
}

Upvotes: 0

Views: 298

Answers (2)

martin
martin

Reputation: 96999

I think you could do it like this:

const click$ = new Subject();
const loadMore$ = new Subject();

const buffer$ = concat(of(N), loadMore$)
  .pipe(
    // Unpack the array of results with `concatAll()` into separate emissions
    concatMap(n => forkJoin(makeNRequests(n)).pipe(concatAll())),
  );

zip(buffer$, click$.pipe(startWith(1)))
  .pipe(
     map((result, index) => {
       if (index > 0) { // We need `index` so that's why this is not inside `tap`.
         loadMore$.next(1);
       }
       return result[0];
     }),
   )
  .subscribe(console.log);

Live demo: https://stackblitz.com/edit/rxjs-kdh9uk?file=index.ts

Upvotes: 1

I think I managed to get the behavior I wanted.

Might be a bit hacky though.. There's still one subscription here. Here's the code:

nextItemSubj: Subject<void> = new Subject();

// loading CACHE_SIZE items on init. And load more on nextItemSubj emission
// spread the loaded array into separate emissions
this.itemsBuffer$ = concat(
    from(new Array(CACHE_SIZE).fill(null)),
    this.nextItemSubj
  ).pipe(
    flatMap(() => this.loadRndItem()),
    bufferCount(CACHE_SIZE),
    mergeMap((ar) => {
      return from(ar);
    }),
    share(),
    takeUntil(this.endSubj)
  );

// emit preloaded values one by one when nextItemSubj.next() is called
this.currentQuizData$ = zip(
    this.itemsBuffer$,
    this.nextItemSubj
  ).pipe(
    map(([val, _]) => val),
    shareReplay(),
    takeUntil(this.endSubj)
  );

// when itemBuffer$ first emits (after CACHE_SIZE items are preloaded)
// this will trigger first emission of currentItem$ to display
// and also trigger next item loading
this.itemsBuffer$.pipe(take(1)).subscribe(() => this.nextItemSubj.next());

Upvotes: 0

Related Questions