Reputation: 7609
I have the following code :
let extension = this._http.Get('/settings/get_kv/credentials', {credential_key : "extensions"})
let urlRef = this._http.Get('/settings/get_kv/credentials', {credential_key : "urlRef"})
let product = this._http.Get('/settings/get_kv/credentials', {credential_key : "product"})
let messageBackend = this._http.Get('/settings/get_kv/credentials', {credential_key : "messageBackend"})
Observable.forkJoin([extension, urlRef, product, messageBackend]).subscribe((results:any) => {
let apikvConfig = {...results[0], ...results[1], ...results[2], ...results[3]};
});
my getMethos is like this :
public Get<T>(url, payload): Observable<T> {
let additionalParam = "";
if(payload) {
additionalParam = "?";
for (var key in payload) {
if(additionalParam != "?") additionalParam += "&"
additionalParam += key+"="+payload[key];
}
}
let onGetCompleted = new Subject<any>();
let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();
this.http.get<T>(url+additionalParam, httpHeaders)
.pipe(
retry(MAX_RETRY)
// TODO add error handler - JP
// catchError()
).subscribe((data) => {
onGetCompleted.next(data);
onGetCompleted.unsubscribe();
})
return onGetCompleted$;
}
But the forkJoin never enter in the subscribtion.
I tried to merge, and also concat, but they booth return my observable and not the result.
What am I doing wrong?
EDIT :
I did :
Load(): Observable<any>{
let extension = this._http.Get('/settings/get_kv/credentials', {credential_key : "extensions"})
let urlRef = this._http.Get('/settings/get_kv/credentials', {credential_key : "urlRef"})
let product = this._http.Get('/settings/get_kv/credentials', {credential_key : "product"})
let messageBackend = this._http.Get('/settings/get_kv/credentials', {credential_key : "messageBackend"})
extension.subscribe((res) => { console.log(res)});
urlRef.subscribe((res) => { console.log(res)});
product.subscribe((res) => { console.log(res)});
messageBackend.subscribe((res) => { console.log(res)});
let obs = Observable.forkJoin([extension, urlRef, product, messageBackend])
obs.subscribe((results:any) => {
let apikvConfig = {...results[0], ...results[1], ...results[2], ...results[3]};
console.log(apikvConfig);
return;
this._storeApp.dispatch(new StoreUseraction.SetExtensionsValue(apikvConfig));
});
return obs;
}
every log do return a value
Upvotes: 0
Views: 210
Reputation: 18663
First a preface, for the benefits of future readers (not specifically aimed at you because I see this a lot):
90% of the time you do not need Subjects
In fact its probably closer to 99% of the time just because that other 9% would be reserved for people writing integration libraries. Most end users of RxJS do not require Subjects
in regular code.
Now that that is done onto your specific question.
Skip to the bottom for the quick answer :).
Your Get
method is attempting to reinvent the wheel and it is introducing a bug into your expected behavior.
let onGetCompleted = new Subject<any>();
let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();
this.http.get<T>(url+additionalParam, httpHeaders)
.pipe(
retry(MAX_RETRY)
// TODO add error handler - JP
// catchError()
).subscribe((data) => {
// *** Here is the problem ***
onGetCompleted.next(data);
onGetCompleted.unsubscribe();
})
return onGetCompleted$;
As @martin astutely mentioned in the comments, forkJoin
requires that each Observable
emit at least once, and complete.
If you just returned the Observable
you created with this.http.get<T>().pipe()
directly from that method, your code would work as expected. Because of the indirection through the Subject
however, the complete
is never being called, so forkJoin
is never executed.
Each of your debug subscribes work, because each stream is emitting an event, it just never completes. unsubscribe
in the context of RxJS means cancel the Observable
, it does not send a complete or error signal, from a down stream perspective the stream just stops, it never completes, events just stop coming and the subscriber is told to clean up any resources it may have allocated. In general it is considered an anti-pattern to explicitly unsubscribe. See: https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87
You should probably be using operators like
takeUntil
to manage your RxJS subscriptions. As a rule of thumb, if you see two or more subscriptions being managed in a single component, you should wonder if you could be composing those better.
Note as well that there is some added danger to calling unsubscribe
explicitly on a Subject
because it affects all current and future subscriptions to the Subject
effectively making it unusable.
Instead of dictating lifecycle logic in the Get
method and potentially shooting yourself in the foot, let the stream
manage itself.
public Get<T>(url, payload): Observable<T> {
let additionalParam = "";
if(payload) {
additionalParam = "?";
for (var key in payload) {
if(additionalParam != "?") additionalParam += "&"
additionalParam += key+"="+payload[key];
}
}
return this.http.get<T>(url+additionalParam, httpHeaders)
.pipe(retry(MAX_RETRY))
}
Now when you use it, it should behave as expected. The one caveat here is that in your current implementation you appear to be attempting to emulate an "only-once" behavior. If you need to preserve that behavior, you can do so through the use of operators. For instance, if I have two places that need the result of the same call I can do:
const onNext = (loc) => () => console.log(`At ${loc}`);
const source$ = this._http.Get(url, payload).pipe(
tap(onNext('pipe')),
publishLast() // This returns a ConnectableObservable
);
// Sets up both subscriptions
source$.subscribe(onNext('first subscriber'));
source$.subscribe(onNext('second subscriber'));
// Executes the GET call by connecting the underlying subscriber to the source
source$.connect();
// Console
// At pipe
// At first subscriber
// At second subscriber
Since this looks like initialization code though, which will only get called once by your application at start up though, even that step may not be necessary.
Upvotes: 1