nick
nick

Reputation: 3711

RXJS Create Observable for sockets

I'm trying to return an Observable from a service that would allow you to listen to any Pusher channel.

Works fine, expect I'd like to unsubscribe from the pusher channel when the Observable (generated using createRealtimeObservable(...)) was unsubscribed from.

Any ideas?

  public realtimeObservable (channelName, eventName): Observable<any> {

        const realtimeObservable$ = new Observable((observer) => {

          const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
          channel.bind(eventName, (data) => {
            observer.next(data.payload)
          })

        })

        return realtimeObservable$
      }

Upvotes: 0

Views: 312

Answers (1)

Mark van Straten
Mark van Straten

Reputation: 9425

When you manually create an Observable yourself it is your responsibility to cleanup all resources when unsubscribed to. Luckily the Observable has a mechanism for this by allowing you to return an unsubscribe/dispose function when creating:

return Observable.create((obs) => {
  const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
  channel.bind(eventName, (data) => {
    observer.next(data.payload)
  });

  return () => {
    // unsubscribe event
    channel.unsubscribe(); // NOTE: i do not know the syntax for unsubscribing a Pusher channel, impl as required.
  };
});

NB: Observable.create() is syntactic sugar for new Observable()

Upvotes: 1

Related Questions