Reputation: 6441
I have a screen which needs to present some constantly changing data from an api. On the observable of my api call, i use repeatWhen() to implement the polling. Also, when i detect certain value changes on the incoming data, i need to do an additional api call. My current implementation works like this:
let queueStatusObservable = this.enumService.getOptions('WorkQueueStatus');
let queueItemObservable = this.enumService.getOptions('WorkItemStatus');
// retrieve status every 3000ms
let summaryObservable = this.service
.getSummary(this.id)
.repeatWhen(completed => completed.delay(3000));
this.summaryModel$ = Observable.combineLatest(queueStatusObservable, queueItemObservable, summaryObservable)
.map(results => {
let workQueueStatusList = results[0];
let workItemStatusList = results[1];
let summary = results[2].status > -1 ? results[2] : null // convert empty {} object to null;
let model = {
workQueueStatusList: workQueueStatusList,
workItemStatusList: workItemStatusList,
summary: summary
};
return model;
});
// fetch validationmessages every time the count changes
this.validationMessages$ = Observable.merge(
summaryObservable.first((value, index) => value.statusCount['Invalid'] > 0),
summaryObservable.pairwise().filter((value, index) => value[0].statusCount['Invalid'] != value[1].statusCount['Invalid'])
).flatMap(i => me.service.getMessages());
// fetch errors every time the count changes
this.errorMessages$ = Observable.merge(
summaryObservable.first((value, index) => value.statusCount['Error'] > 0),
summaryObservable.pairwise().filter((value, index) => value[0].statusCount['Error'] != value[1].statusCount['Error'])
).flatMap(i => me.service.getErrors());
There are no subscribe() calls on the observables, because those happen in my angular templates using the async pipe, like so:
<div *ngIf="summaryModel$|async; let summaryModel">
I expected that i would get one api call every 3 seconds, and that all statements working on the summaryObservable would simply be triggered by this api call response.
It does not seem to work that way. When i open the network tab in chrome, i see that i get 4 api calls every 3 seconds. Is this how rxjs is supposed to work, or am i using rxjs in the wrong way?
The solution i ended up with is:
let summaryObservable = this.service
.getSummary(this.id)
.repeatWhen(completed => completed.delay(3000))
.shareReplay();
Using the share methods, i dont need to manage connecting/deconnecting myself, and the shareReplay makes sure that every value can be requested multiple times.
Upvotes: 5
Views: 1915
Reputation: 39192
you are using the same summaryObservable
5 different times. 2 of them are single-use only first
calls. that would leave you with 3 different uses in general.
When you want to reuse the same observable multiple times, you need to use something like publish
to "share" a single subscription of the observable with all the uses.
Try something like this:
let summaryObservable = this.service
.getSummary(this.id)
.repeatWhen(completed => completed.delay(3000))
.publish(); // creates a ConnectableObservable
// ...
// all the rest of your code goes here
// ...
// Finally "connect" the observable to start the polling
// that will be shared by all the other usages
this.summaryConnection = summaryObservable.connect()
And make sure you define ngOnDestroy
to stop the polling when your component is destroyed:
ngOnDestroy() {
this.summaryConnection.unsubscribe(); // .dispose() if using older version of RXJS
}
Upvotes: 3