Johan Versteeg
Johan Versteeg

Reputation: 1

How to unsubscribe from nested observable

Using the ngOnDestroy() method on our Angular component we cancel http requests if they are still pending when leaving a page. On some pages we use a custom generic cache helper to prevent reloading data which are already loaded.

import { HttpClient } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { AsyncSubject } from "rxjs";

@Injectable()
export class HttpCacheHelper {
    public cache = new Map<string, AsyncSubject<any>>();

    public constructor(private readonly http: HttpClient) {        

    }

    public get<T>(url: string): AsyncSubject<T> {
        if (!this.cache.has(url)) {
            const subject = new AsyncSubject<T>();
            this.cache.set(url, subject);
            this.http.get(url)
                .subscribe((data:any) => {
                    subject.next(data as T);
                    subject.complete();
                });
        }
        return this.cache.get(url);
    }
}

If I unsubscribe from the AsyncSubject my http call is (of course) not cancelled. How to accomplish that?

Upvotes: 0

Views: 991

Answers (5)

Bazinga
Bazinga

Reputation: 11194

I would just go with a publishReplay and refCount:

@Injectable()
export class HttpCacheHelper {
  public cache = new Map<string, Observable<yourType>>();

  public constructor(private readonly http: HttpClient) {}

  public get<T>(url: string): Observable<T> {
    if (!this.cache.has(url)) {
      const obs = this.http.get(url).pipe(
         publishReplay(1),
         refCount()
      )

      this.cache.set(url, obs); 
    }

    return this.cache.get(url);
  }

  // Use below function when required
  ngOnDestroy(): void {
    this.mySubscription.unsubscribe();
  }
}

Upvotes: 0

Jayme
Jayme

Reputation: 1946

You could try unsubscribing this way as well

Create a subject we want to use to unsubscribe

private onDestroy$ = new Subject();

Add this before any .subscribe() that you need to unsubscribe from

.pipe(takeUntil(this.onDestroy$))

example

this.anyService.getData()
  .pipe(takeUntil(this.onDestroy$))
  .subscribe((data: any) => {
    // Do work ...
  });

Then use ngOnDestroy() like this

ngOnDestroy() {
  this.onDestroy$.next();
  this.onDestroy$.complete(); // Also unsubscribe from the subject itself
}

When we ngOnDestroy() runs and we complete the subject any subscriptions using the takeUntill will also automatically unsubscribe

Upvotes: 0

Ivan Tarskich
Ivan Tarskich

Reputation: 1527

I this case u dont have to unsubscribe from the http source ( see this question). Genarally you could use the take operator or flatmap for nested subscribtions.

Upvotes: 1

undefined
undefined

Reputation: 6844

You'll need to unsubscribe from the source:

public get<T>(url: string): AsyncSubject<T> {
    if (!this.cache.has(url)) {
        const subject = new AsyncSubject<T>();

        const subscription = this.http.get(url)
            .subscribe((data:any) => {
                subject.next(data as T);
                subject.complete();
            });

         this.cache.set(url, {
           subscribe(observer) { subject.subscribe(observer) },
           unsubscribe(observer) { subscription.unsubscribe() }
         });
    }

    return this.cache.get(url);
}

Upvotes: 0

Ismail
Ismail

Reputation: 1316

Get the subscription Object and unsubscribe from it.

import { HttpClient } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { AsyncSubject, Subscription } from "rxjs";

@Injectable()
export class HttpCacheHelper {
    public cache = new Map<string, AsyncSubject<any>>();
    private mySubscription: Subscription;
    public constructor(private readonly http: HttpClient) {        

    }

    public get<T>(url: string): AsyncSubject<T> {
        if (!this.cache.has(url)) {
            const subject = new AsyncSubject<T>();
            this.cache.set(url, subject);
    this.mySubscription = this.http.get(url)
                .subscribe((data:any) => {
                    subject.next(data as T);
                    subject.complete();
                });
        }
        return this.cache.get(url);
    }

// Use below function when required
ngOnDestroy(): void {
    this.mySubscription.unsubscribe();
  }

}

Upvotes: 0

Related Questions