Gesh
Gesh

Reputation: 575

RxJS Observable with Subject, polling via timer and combineLatest does not fire

I wrote a function to do polling on an API that is also able to do pagination. Herefor the pagination is done using a Subject Observable and polling is done with the timer method (I also tried interval with the same result).

Here is my code:

  getItems(pagination: Subject<Pagination>): Observable<ListResult<Item>> {
    let params: URLSearchParams = new URLSearchParams();

    return Observable
      .timer(0, 5000)
      .combineLatest(
        pagination,
        (timer, pagination) => pagination
      )
      .startWith({offset: 0, limit: 3})
      .switchMap(pagination => {
        params.set('skip', pagination.offset.toString());
        params.set('limit', pagination.limit.toString());
        return this.authHttp.get(`${environment.apiBase}/items`, {search: params})
      })
      .map(response => response.json() as ListResult<Item>)
      .catch(this.handleError);
  }

The expected behavior would be: HTTP request is fired every 5 seconds AND when the user changes the page.

This is what happens: First HTTP request is fired, but then no other request is sent to the server UNTIL pagination is used. After pagination is used the first time, the polling starts working too.

It's the first time I'm using Observables so I'm pretty sure I missed something, but I can't see what it might be.

I also tried this approach (maybe it was missing the timer counter in startWith), but it didn't change anything.

[...]
  .combineLatest(
    pagination
  )
  .startWith([0, {offset: 0, limit: 3}])
[...]

Upvotes: 4

Views: 2323

Answers (2)

Aniruddha Das
Aniruddha Das

Reputation: 21688

Make sure you are following below steps

Creating Observable Data

You have an Observable variable and you are pushing data by observable.next(value);

Putting data into a Subject or Behavior Subject

Where you want to get the data you are declaring a Subject type variable and access it by subscribing to it.

private val: Subject = YourObservable;
private uiValue: string;

val.subscribe((val) => {
   uiVal = val;
});

Now you can use the uiVal anywhere which will update in each inteval.

You can test both the pieces separately by just making the console.log to make sure everything is working fine.

Upvotes: -1

martin
martin

Reputation: 96899

The combineLatest() operator requires all source Observables to emit at least one item.

Your demo makes only one request because you're using .startWith(). The combineLatest() never emits because the pagination is a Subject and it probably never emits any item.

So one option is to move .startWith():

.combineLatest(
  pagination.startWith({offset: 0, limit: 3}),
  (timer, pagination) => pagination
)

But maybe this doesn't help you much because you're ignoring all items coming from the timer() and you're using just pagination. So maybe you could use just merge() to refresh the list from one of the sources. Then the timer() independently increases the offset.

Observable
  .timer(0, 1000)
  .map(i => { return {offset: i*3, limit: 3}})
  .merge(pagination.startWith({offset: 0, limit: 3}))
  .switchMap(pagination => {
    return Observable.of(pagination).delay(200)
  })
  .subscribe(val => console.log(val));

Upvotes: 2

Related Questions