Reputation: 1588
I have a chat app, and let's say that I am having a chat with Jane. When I have the chat screen open with Jane and she sends me a message I receive it correctly. However, if I close the message, then open it again and Jane sends me a message I receive two copies of it. When I close the screen and open it up again and Jane sends a message I get three copies of it.
It appears that when closing the screen the observable is not being properly unsubscribed from. When I check my back end, there is only one message being sent at a time, never multiple.
export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
private _messages: Subscription;
private message = ''; //for the [(ngModel)]
public ngOnInit(): void {
// below I am subscribing to the observable
this._messages = this.chatService.getMessages$().subscribe((message: Message[]): void => {
console.lot('msg received', message);
this.messages$ = message;
});
}
public async sendMessage(): Promise<void> {
const message = await this._bundleMessage(); // formats the message
this.chatService.sendMessage(message);
this.message = '';
}
public ngOnDestroy(): void {
this._messages.unsubscribe();
}
}
private messages: Message[] = new Array<Message>();
private readonly bsMessages$: BehaviorSubject<Message[]> = new BehaviorSubject<Message[]>(this.messages);
public getMessages$(): Observable<Message[]> {
return this.bsMessages$.asObservable();
}
public async setAsActiveConversation(user: UserData): Promise<void>{
this._incomingMessages = this.receiveMessages().subscribe();
const { id } = user;
this.activeConversation = await this.conversationStorage.get(id);
console.log('after setting the active conversation', this.activeConversation);
if (this.activeConversation === null) {
await this._newConversation(user);
await this.conversationStorage.set(id, this.activeConversation);
this.messages = this.activeConversation.messages;
this.bsMessages$.next(this.messages);
} else {
await this.activeConversation.messages.forEach((msg: Message): void => {
if (msg.image) {
msg.image = this.webView.convertFileSrc(msg.image);
} else {
msg.image = '';
}
// console.log('after the if statement', this.activeConversation);
this.messages = this.activeConversation.messages;
this.bsMessages$.next(this.messages);
});
}
}
public async sendMessage(msg: Message): Promise<void> {
this.socket.emit('message', msg); //emits to server
this.activeConversation.messages.push(msg); //adds message to array
this.conversationStorage.set(this.activeConversation.id, this.activeConversation); //saves message locally
this.bsMessages$.next(this.messages); //signals to update
}
If you need more information let me know, but I think it boils down to this
Edit: added more code to help clarify some of the comments. Also, when I log the results of the array see above I can see that there are duplicates being added to the array. it's not just visually. When it sends 2+ messages I see that array length getting added by 2+
Upvotes: 1
Views: 340
Reputation: 3205
While you can store the return of .subscribe()
in a local variable and ultimately call .unsubscribe()
yourself, that's generally cumbersome and error-prone. Instead, consider this approach:
// RxJs pipeable operator for subscribing until component fires onDestroy
export function takeUntilComponentDestroyed(component: OnDestroy): MonoTypeOperatorFunction<any> {
const componentDestroyed = (comp: OnDestroy) => {
const oldNgOnDestroy = comp.ngOnDestroy;
const destroyed$ = new ReplaySubject<void>(1);
comp.ngOnDestroy = () => {
oldNgOnDestroy.apply(comp);
destroyed$.next(undefined);
destroyed$.complete();
};
return destroyed$;
};
return pipe(
takeUntil(componentDestroyed(component))
);
}
This defines a custom pipe
for RxJs that will allow us to take
until the specified component is destroyed. That component must implement OnDestroy
like so:
export class SomeComponent implements OnInit, OnDestroy {
...
// OnDestroy interface contract requires:
ngOnDestroy() { }
...
}
Now that you have a component setup to implement OnDestroy, you can use our new custom Pipe to automatically take
until it's destroyed like so:
observable$.pipe(
takeUntilComponentDestroyed(this)
).subscribe(value => {
console.log("Value emits until component destroyed", value)
});
Now you can use that pipe anywhere you want to listen without worrying about unsubscribing!
Upvotes: 0
Reputation: 1588
Error was here:
public async setAsActiveConversation(user: UserData): Promise<void>{
this._incomingMessages = this.receiveMessages().subscribe(); //<--- here
const { id } = user;
I wasn't unsubscribing from this which was causing the memory leaks!
in the chat.component.ts
I added
public ngOnDestroy(): void {
this.chatService.setLastMessage();
this._messages.unsubscribe();
this.chatService._incomingMessages.unsubscribe(); // <---
}
Upvotes: 1