Crocsx
Crocsx

Reputation: 7609

forkJoin don't trigger response

I have the following code :

    let extension = this._http.Get('/settings/get_kv/credentials', {credential_key : "extensions"})
    let urlRef = this._http.Get('/settings/get_kv/credentials', {credential_key : "urlRef"})
    let product = this._http.Get('/settings/get_kv/credentials', {credential_key : "product"})
    let messageBackend = this._http.Get('/settings/get_kv/credentials', {credential_key : "messageBackend"})

    Observable.forkJoin([extension, urlRef, product, messageBackend]).subscribe((results:any) => {
        let apikvConfig = {...results[0], ...results[1], ...results[2], ...results[3]};
    });

my getMethos is like this :

public Get<T>(url, payload): Observable<T> {
    let additionalParam = "";
    if(payload) {
        additionalParam = "?";
        for (var key in payload) {
            if(additionalParam != "?") additionalParam += "&"
            additionalParam += key+"="+payload[key];
        }
    }

    let onGetCompleted = new Subject<any>();
    let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();

    this.http.get<T>(url+additionalParam, httpHeaders)
        .pipe(
            retry(MAX_RETRY)

            // TODO add error handler - JP
            // catchError()
        ).subscribe((data) => {
            onGetCompleted.next(data);
            onGetCompleted.unsubscribe();
        })
    return onGetCompleted$;
}

But the forkJoin never enter in the subscribtion.

I tried to merge, and also concat, but they booth return my observable and not the result.

What am I doing wrong?

EDIT :

I did :

Load(): Observable<any>{
    let extension = this._http.Get('/settings/get_kv/credentials', {credential_key : "extensions"})
    let urlRef = this._http.Get('/settings/get_kv/credentials', {credential_key : "urlRef"})
    let product = this._http.Get('/settings/get_kv/credentials', {credential_key : "product"})
    let messageBackend = this._http.Get('/settings/get_kv/credentials', {credential_key : "messageBackend"})

    extension.subscribe((res) => { console.log(res)});
    urlRef.subscribe((res) => { console.log(res)});
    product.subscribe((res) => { console.log(res)});
    messageBackend.subscribe((res) => { console.log(res)});


    let obs = Observable.forkJoin([extension, urlRef, product, messageBackend])

    obs.subscribe((results:any) => {
        let apikvConfig = {...results[0], ...results[1], ...results[2], ...results[3]};
        console.log(apikvConfig);
        return;
        this._storeApp.dispatch(new StoreUseraction.SetExtensionsValue(apikvConfig));
    });

    return obs;
}

every log do return a value

Upvotes: 0

Views: 210

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

First a preface, for the benefits of future readers (not specifically aimed at you because I see this a lot):

90% of the time you do not need Subjects

In fact its probably closer to 99% of the time just because that other 9% would be reserved for people writing integration libraries. Most end users of RxJS do not require Subjects in regular code.

Now that that is done onto your specific question.

Skip to the bottom for the quick answer :).

Your Get method is attempting to reinvent the wheel and it is introducing a bug into your expected behavior.

let onGetCompleted = new Subject<any>();
let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();

this.http.get<T>(url+additionalParam, httpHeaders)
    .pipe(
        retry(MAX_RETRY)

        // TODO add error handler - JP
        // catchError()
    ).subscribe((data) => {
        // *** Here is the problem ***
        onGetCompleted.next(data);
        onGetCompleted.unsubscribe();
    })
return onGetCompleted$;

As @martin astutely mentioned in the comments, forkJoin requires that each Observable emit at least once, and complete.

If you just returned the Observable you created with this.http.get<T>().pipe() directly from that method, your code would work as expected. Because of the indirection through the Subject however, the complete is never being called, so forkJoin is never executed.

Each of your debug subscribes work, because each stream is emitting an event, it just never completes. unsubscribe in the context of RxJS means cancel the Observable, it does not send a complete or error signal, from a down stream perspective the stream just stops, it never completes, events just stop coming and the subscriber is told to clean up any resources it may have allocated. In general it is considered an anti-pattern to explicitly unsubscribe. See: https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87

You should probably be using operators like takeUntil to manage your RxJS subscriptions. As a rule of thumb, if you see two or more subscriptions being managed in a single component, you should wonder if you could be composing those better.

Note as well that there is some added danger to calling unsubscribe explicitly on a Subject because it affects all current and future subscriptions to the Subject effectively making it unusable.

The Fix

Instead of dictating lifecycle logic in the Get method and potentially shooting yourself in the foot, let the stream manage itself.

public Get<T>(url, payload): Observable<T> {
    let additionalParam = "";
    if(payload) {
        additionalParam = "?";
        for (var key in payload) {
            if(additionalParam != "?") additionalParam += "&"
            additionalParam += key+"="+payload[key];
        }
    }


    return this.http.get<T>(url+additionalParam, httpHeaders)
        .pipe(retry(MAX_RETRY))
}

Now when you use it, it should behave as expected. The one caveat here is that in your current implementation you appear to be attempting to emulate an "only-once" behavior. If you need to preserve that behavior, you can do so through the use of operators. For instance, if I have two places that need the result of the same call I can do:

const onNext = (loc) => () => console.log(`At ${loc}`);

const source$ = this._http.Get(url, payload).pipe(
  tap(onNext('pipe')),
  publishLast() // This returns a ConnectableObservable
);


// Sets up both subscriptions
source$.subscribe(onNext('first subscriber'));
source$.subscribe(onNext('second subscriber'));
// Executes the GET call by connecting the underlying subscriber to the source
source$.connect();

// Console
// At pipe
// At first subscriber
// At second subscriber

Since this looks like initialization code though, which will only get called once by your application at start up though, even that step may not be necessary.

Upvotes: 1

Related Questions