Reputation: 11849
In my Angular 2.x application, I have a component which subscribes to an observable exposed by a service. The observable data service is currently implemented with polling HTTP requests but that will change to a reactive WS implementation. I'd like to keep the polling out of the components so the question is: how can I take an action, like HTTP polling via setInterval
, once I have subscribers? And, how can I take an action, like clearInterval
when there are no more subscribers?
The view is asynchronously updated:
<div *ngFor="let headline of headlines$ | async">
<md-list-item (click)="onSelect(headline)">
<h4 md-line>{{headline.title}}</h4>
</md-list-item>
<md-divider></md-divider>
</div>
The component gets the headline observable from the service:
export class HeadlinesComponent implements OnInit, OnDestroy {
constructor(
private _headlineService: HeadlineService,
) {
}
headlines$: Observable<Headline[]>;
ngOnInit() {
this.headlines$ = this._headlineService.headlines$;
}
}
And the service is implemented as an observable data store:
@Injectable()
export class HeadlineService {
constructor(
private _http: Http) {
_init();
}
private _headlines$: BehaviorSubject<Headline[]> =
new BehaviorSubject<Headline[]>([]);
get headlines$(): Observable<Headline[]> {
return this._headlines$.asObservable();
}
private _store: {
headlines: Headline[]
} = { headlines: [] };
private _save(headlines: Headline[]) {
this._store.headlines = headlines;
this._headlines$.next(Object.assign({}, this._store).headlines);
}
}
and the store is updated via HTTP:
private _interval;
private _init() {
let self = this;
this._interval = setInterval(function(){ self._loadHeadlines();}, 5000);
}
private _loadHeadlines(): Observable<Headline[]> {
let self = this;
let observable: Observable<Headline[]> = this._http
.get(url, options)
.map(response => response.json()._embedded.headlines)
.share();
observable.subscribe(
headlines => self._save(headlines)
);
return observable;
}
As implemented, the headline service starts polling on construction instead of delaying until a subscriber is present. To get the lifecycle correct, I could expose the service's _loadHeadlines
functionality and start polling when the component's ngOnInit
is triggered (and stop in ngOnDestroy
) but that exposes the service implementation details to the component (these details will change when I move to a web-sockets implementation) and I'd need have an accurate count of the number of subscribers.
So, is there a nice RxJs-ish way to solve this?
Upvotes: 0
Views: 1747
Reputation: 9425
Observables are lazy and only start working when subscribed to. When you stop listening they will unsubscribe and clean up.
What you are looking for is multicast behaviour with refcounting to init upon first subscription and only when nobody is subscribed anymore to clean up. This is done using the .share()
operator.
Upvotes: 1
Reputation: 8855
I've not used RxJS in Angular but from your example code it appears you could .create
an observable that starts the interval to poll and return the clear interval function which will be called when it's unsubscribed from.
/*
Increment value every 1s, emit even numbers.
*/
const evenNumbers = Rx.Observable.create(function(observer) {
let value = 0;
const interval = setInterval(() => {
if(value % 2 === 0){
observer.next(value);
}
value++;
}, 1000);
return () => clearInterval(interval);
});
//output: 0...2...4...6...8
const subscribe = evenNumbers.subscribe(val => console.log(val));
//unsubscribe after 10 seconds
setTimeout(() => {
subscribe.unsubscribe();
}, 10000);
The example above is from learnrxjs.io
Upvotes: 0