Ctfrancia
Ctfrancia

Reputation: 1588

Angular8 - observable not unsubscribing on ngOnDestroy

I have a chat app, and let's say that I am having a chat with Jane. When I have the chat screen open with Jane and she sends me a message I receive it correctly. However, if I close the message, then open it again and Jane sends me a message I receive two copies of it. When I close the screen and open it up again and Jane sends a message I get three copies of it.

It appears that when closing the screen the observable is not being properly unsubscribed from. When I check my back end, there is only one message being sent at a time, never multiple.

chat.component.ts

export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
  private _messages: Subscription;
  private message = ''; //for the [(ngModel)]


  public ngOnInit(): void {
    // below I am subscribing to the observable
    this._messages = this.chatService.getMessages$().subscribe((message: Message[]): void => {
      console.lot('msg received', message);
      this.messages$ = message;
    });
  }

  public async sendMessage(): Promise<void> {
    const message = await this._bundleMessage(); // formats the message
    this.chatService.sendMessage(message);
    this.message = '';
  }

  public ngOnDestroy(): void {
    this._messages.unsubscribe();
  }
}

chat.service.ts

  private messages: Message[] = new Array<Message>();
  private readonly bsMessages$: BehaviorSubject<Message[]> = new BehaviorSubject<Message[]>(this.messages);

  public getMessages$(): Observable<Message[]> {
    return this.bsMessages$.asObservable();
  }

  public async setAsActiveConversation(user: UserData): Promise<void>{
    this._incomingMessages = this.receiveMessages().subscribe();
    const { id } = user;
    this.activeConversation = await this.conversationStorage.get(id);
    console.log('after setting the active conversation', this.activeConversation);
    if (this.activeConversation === null) {
      await this._newConversation(user);
      await this.conversationStorage.set(id, this.activeConversation);
      this.messages = this.activeConversation.messages;
      this.bsMessages$.next(this.messages);
    } else {
      await this.activeConversation.messages.forEach((msg: Message): void => {
        if (msg.image) {
          msg.image = this.webView.convertFileSrc(msg.image);
        } else {
          msg.image = '';
        }
        // console.log('after the if statement', this.activeConversation);
        this.messages = this.activeConversation.messages;
        this.bsMessages$.next(this.messages);
      });
    }
  }


  public async sendMessage(msg: Message): Promise<void> {
    this.socket.emit('message', msg); //emits to server
    this.activeConversation.messages.push(msg); //adds message to array
    this.conversationStorage.set(this.activeConversation.id, this.activeConversation); //saves message locally
    this.bsMessages$.next(this.messages); //signals to update
  }

If you need more information let me know, but I think it boils down to this

Edit: added more code to help clarify some of the comments. Also, when I log the results of the array see above I can see that there are duplicates being added to the array. it's not just visually. When it sends 2+ messages I see that array length getting added by 2+

Upvotes: 1

Views: 340

Answers (2)

Scott Byers
Scott Byers

Reputation: 3205

While you can store the return of .subscribe() in a local variable and ultimately call .unsubscribe() yourself, that's generally cumbersome and error-prone. Instead, consider this approach:

Custom pipe:

// RxJs pipeable operator for subscribing until component fires onDestroy
export function takeUntilComponentDestroyed(component: OnDestroy): MonoTypeOperatorFunction<any> {
  const componentDestroyed = (comp: OnDestroy) => {
    const oldNgOnDestroy = comp.ngOnDestroy;
    const destroyed$ = new ReplaySubject<void>(1);
    comp.ngOnDestroy = () => {
      oldNgOnDestroy.apply(comp);
      destroyed$.next(undefined);
      destroyed$.complete();
    };
    return destroyed$;
  };

  return pipe(
    takeUntil(componentDestroyed(component))
  );
}

This defines a custom pipe for RxJs that will allow us to take until the specified component is destroyed. That component must implement OnDestroy like so:

Class definition:

export class SomeComponent implements OnInit, OnDestroy { 
    ... 
    // OnDestroy interface contract requires:
    ngOnDestroy() { }
    ...
}

Now that you have a component setup to implement OnDestroy, you can use our new custom Pipe to automatically take until it's destroyed like so:

observable$.pipe(
    takeUntilComponentDestroyed(this)
).subscribe(value => {
    console.log("Value emits until component destroyed", value)
});

Now you can use that pipe anywhere you want to listen without worrying about unsubscribing!

Upvotes: 0

Ctfrancia
Ctfrancia

Reputation: 1588

Error was here:

  public async setAsActiveConversation(user: UserData): Promise<void>{
    this._incomingMessages = this.receiveMessages().subscribe(); //<--- here
    const { id } = user;

I wasn't unsubscribing from this which was causing the memory leaks!

in the chat.component.ts I added


  public ngOnDestroy(): void {
    this.chatService.setLastMessage();
    this._messages.unsubscribe();
    this.chatService._incomingMessages.unsubscribe(); // <---
  }

Upvotes: 1

Related Questions