Reputation: 4233
I have two observables
clients$
is a stream of client connections
data$
is a stream that delivers data endlessly
Whenever a client connects, I subscribe to data$
and send it to the client.
I read that nesting subscriptions is a no-go in RxJS, so this approach is probably wrong.
const clients$ = createClientStream()
const data$ = Observable.interval(1000).share()
clients$.subscribe(c => {
const s = data$.subscribe(d => c.send(d))
c.on('disconnect', () => s.unsubscribe())
})
What is the idiomatic RxJS approach to this?
Upvotes: 1
Views: 1172
Reputation: 18665
You could use flatMap
.
const clients$ = createClientStream()
const data$ = Observable.interval(1000).share()
clients$
.flatMap(c => {
const disconnect$ = Rx.Observable.create (observer => {
c.on('disconnect', () => {observer.onNext({}); observer.onCompleted();})
})
return data$.takeUntil(disconnect$)
}, (c,d) => c.send(d))
Upvotes: 2