laprof
laprof

Reputation: 1354

Unsubscribe observable in an Angular service

I want to unsubscribe an observable in an angular service once a certain state is present. The unsubscribe should be executed within the subscription. Unfortunately the code below does not work.

@Injectable()
export class CartManagementUsecase {
  private unsubscribe = new Subject();

  public streamSession(): void {
    this.adapter
        .streamSession()
        .pipe(takeUntil(this.unsubscribe))
        .subscribe((session) => {
          if(session.session_state === SessionState.CLOSED) {
            this.unsubscribe.unsubscribe();
          }
    });
  }
}

Upvotes: 1

Views: 1386

Answers (3)

Deepak Kathait
Deepak Kathait

Reputation: 1

The first thing is to avoid unsubscribing the service itself. But if you still want to unsubscribe then check the below code.

I don't recommend the below code, but this way u can do it.

private unsubscribe = new Subject();

  public streamSession(): void {
    
    // remove the previous subscription first
    this.unsubscribe.complete();

    // create new subject again
    this.unsubscribe = new Subject();

    this.adapter
        .streamSession()
        .pipe(takeUntil(this.unsubscribe))
        .subscribe((session) => {
          if(session.session_state === SessionState.CLOSED) {
              this.unsubscribe.next();
              this.unsubscribe.complete();
          }
    });
  }

Upvotes: -1

Andrei
Andrei

Reputation: 12206

I would recommend you to take events untill closed is come. no subject would be required

public streamSession(): void {
    this.adapter
        .streamSession()
        .pipe(takeWhile(session => session.session_state != SessionState.CLOSED))
        .subscribe((session) => {
          // do something that is required
        });
  }

Upvotes: 4

Kanishk Anand
Kanishk Anand

Reputation: 1702

You need to call complete on your subject for takeUntil to unsubscribe to the observable.

@Injectable()
export class CartManagementUsecase {
  private unsubscribe = new Subject();

  public streamSession(): void {
    this.adapter
        .streamSession()
        .pipe(takeUntil(this.unsubscribe))
        .subscribe((session) => {
          if(session.session_state === SessionState.CLOSED) {
            this.unsubscribe.complete();
          }
    });
  }
}

Upvotes: 0

Related Questions