K..
K..

Reputation: 4233

Simplifying nested subscriptions in RxJS

I have two observables

clients$ is a stream of client connections

data$ is a stream that delivers data endlessly

Whenever a client connects, I subscribe to data$ and send it to the client.

I read that nesting subscriptions is a no-go in RxJS, so this approach is probably wrong.

const clients$ = createClientStream()
const data$ = Observable.interval(1000).share()

clients$.subscribe(c => {

  const s = data$.subscribe(d => c.send(d))

  c.on('disconnect', () => s.unsubscribe())

})

What is the idiomatic RxJS approach to this?

Upvotes: 1

Views: 1172

Answers (1)

user3743222
user3743222

Reputation: 18665

You could use flatMap.

const clients$ = createClientStream()
const data$ = Observable.interval(1000).share()

clients$
  .flatMap(c => {
     const disconnect$ = Rx.Observable.create (observer => {
       c.on('disconnect', () => {observer.onNext({}); observer.onCompleted();})
     })
     return data$.takeUntil(disconnect$)
  }, (c,d) => c.send(d))

Upvotes: 2

Related Questions