Majesty
Majesty

Reputation: 1915

RxJs: how to maintain one WebSocket connection across multiple components?

I have a chat application that consists of multiple components (e.g. one components holds a group list, another one contains messages, etc.), and when one component is opened another one must be closed.

So here is a service that I'm using to provide a webSocket connection:

@Injectable()
export class ChatConnectionService {

  private _connection: Subject<any>;

  constructor(private window: WindowAbstract) { }

  public closeConnection(): void {
    this.connection.complete();
  }

  public send(msg: ChatRequest): void {
    this.connection.next(msg);
  }

  public messages(): Subject<any> {
    return this.connection;
  }

  private get connection(): Subject<any> {
    if (!this._connection || this._connection.closed) {
      this._connection = webSocket(`https/myapp/chat`);
    }

    return this._connection;
  }
}

And here is a usage example in some.component.ts:

  private messages$: Subscription;

  ngOnInit(): void {
    this.messages$ = this.chatConnectionService.messages()
      .subscribe(m => this.handleInboundMessage(m));
  }

  ngOnDestroy() {
    this.messages$.unsubscribe();
  }

Now the problem is that when I switch between components, once it gets unsubscribed when an old component being destroyed, a new subscription does not seem to be working, it does not send or received messages and does not produce any error as well.

But when I remove unsubscribe line it starts working, but keeps receiving messages using the old subscription as well, which is not what I really want.

So summarizing it all - I need to maintain one connection but having a possibility for multiple components to subscribe and unsubscribe as needed, is it possible to accomplish it with RxJs? Or what am I doing wrong? Any suggestion will be appreciated, thanks!

Upvotes: 2

Views: 4561

Answers (2)

Joshua McCarthy
Joshua McCarthy

Reputation: 1852

The issue is that you are giving components direct reference to the WebSocketSubject being returned from webSocket('https/myapp/chat'). If you invoke the .unsubscribe() method on WebSocketSubject, it closes the channel, which is why no other components detect messages.

Rather than doing getters, declare a public observable that is piped from the private WebSocketSubject.

@Injectable()
export class ChatConnectionService {

  public messages$ = Observable<any>; // Replace 'any' with WS response type.
  private connection: WebSocketSubject;

  constructor(private window: WindowAbstract) { 
    this.connection = webSocket(`https/myapp/chat`);
    this.messages$ = this.connection.asObservable();
  }

}

Your component will now be unsubscribing from the observable, rather than invoking the WebSocketSubject to unsubscribe from the channel.


Update After further review of the WebSocketSubject being instantiated by webSocket() I realized that .asObservable() isn't the best use case for your situation. Even if your components unsubscribe from the observable, the Subject is still receiving new data from the open WS channel.

this.messages$ = this.connection.multiplex(
  () => { 'send message to WS server to start' }, 
  () => { 'send message to WS server to stop' }, 
  (message) => true // filter messages by condition
);

You can declare an observable from your WebSocketSubject using the multiplex() method. It takes three parameters:

  1. Function that declares the start message for WS.
  2. Function that declares the end message for WS.
  3. Function that filters messages

Now, whenever any component subscribes to messages$, it will trigger the start message, opening the channel for subscribers. When all observables unsubscribe from messages$, it sends the close signal halting any further messages.

This saves a lot of boilerplate for maintaining a singleton state for your WS.

Reference: https://rxjs.dev/api/webSocket/WebSocketSubject

Upvotes: 3

Majesty
Majesty

Reputation: 1915

I will just leave it here for someone who endup in the same situation.

You need at least one subscription running all the time so it doesn't close connection when you unsubscribe.

So here is what I did:

@NgModule({
  imports: [
     ...
  ],
  providers: [
    {
        provide: APP_INITIALIZER,
        multi: true,
        // subscribing establishes a connections
        useFactory: (chat: ChatConnectionService) => () => chat.messages$.subscribe(),
        deps: [ChatConnectionService]
    }
  ]
})

export class AppModule {
}

Not so obvious pitfall here is that unsubscribing from WebSocketSubject and from an Observable, that was created by subscribing to the subject, have the same effect - it closes an active connection if it was the last subscription.

Upvotes: 1

Related Questions