Reputation: 526
I have two services that can fetch a given object : an initial HTTP call to get the full list, and websocket notification to have live update of new elements.
I want to wire the two calls to the same observable so the display page has no knowledge of the backend.
There is the call mades :
private getInitialData(subject: Subject<any>) {
this._http.get<Array<any[]>>(`/call/to/backend`)
.flatMap(items => items)
.subscribe(subject);
}
private listenNewData(subject: Subject<any>) {
this.websocketService.listen('/some/ws/topic')
.subscribe(subject);
}
private initialize() {
const subject = new Subject<any>();
this.dataService.getInitialData(subject);
this.listenNewData(subject);
subject.subscribe(item => this.items.push(item))
}
When registering this way the subject it seems the subject is actually closed after the HTTP resolution, and no websocket notification get sent to the subject.
However, when registering using callbacks on subscribes functions :
private getInitialData(subject: Subject<any>) {
this._http.get<Array<any[]>>(`/call/to/backend`)
.flatMap(items => items)
.subscribe(item => subject.next(item));
}
private listenNewData(subject: Subject<any>) {
this.websocketService.listen('/some/ws/topic')
.subscribe(item => subject.next(item));
}
I have no issues and all items are correctly fetched.
I wanted to know why the two versions are different, and why, in the first case, the subject end up closed.
Thanks for any help.
Upvotes: 2
Views: 2372
Reputation: 96891
This is correct behavior. When you use .subscribe(subject)
you're subscribing to all three types of notifications. All Subjects are observers so it's the same as writing the following:
.subscribe(
v => subject.next(v),
err => subject.error(err),
() => subject.complete()
);
So if you use just .subscribe(item => subject.next(item))
you're passing only the next
notifications and no error
or complete
notifications so subject
is never completed.
Upvotes: 2
Reputation: 1258
Why don't you use forkjoin?
const apiCalls = [
this._http.get<Array<any[]>>(`/call/to/backend`),
this.websocketService.listen('/some/ws/topic')
];
forkJoin(apiCalls)
.subscribe((results: Array<any>) => {
// results[0] = call from api call 1
// results[1] = call from api call 2
// so combine results: combined = results[0] + results[1]
this.items.push(combined)
}, (error) => console.log(error));
This way, when both results come back you'll be in the subscribe where you can use both results instead of combining a subject.
Don't forgot to add the imports:
import {forkJoin} from 'rxjs/observable/forkJoin';
import {Observable} from 'rxjs/Observable';
Upvotes: 1