Reputation: 6541
Trying to hunt down the recipe i need but can't find it anywhere.
I have code that looks like this.
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
How can i build a pipeline that:
person
that joins my listeners stream i subscribe them to the data stream.data:leave
event unsubscribes from the streamEDIT: What is the equivilent to this in a memory safe way:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
Upvotes: 2
Views: 423
Reputation: 31115
I am not exactly sure of your observables behavior, but on a general level you could use any of the RxJS higher-order mapping operators (like switchMap
, concatMap
etc. - differences here) to map from one observable to another. And use RxJS takeUntil
operator to complete/unsubscribe from an observable based on another observable.
You could use the takeUntil
to also close all open subscriptions when the component is closed.
Try the following
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}
Upvotes: 1
Reputation: 9124
I think you wanna look on the skip and take operators of rxjs.
Example:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
data
is here a continous stream of emitions with a incremental number each second. start
and end
emit once after a defined time. In the console you will see a limited range of the data streamed.
Stackblitz: https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts
Upvotes: 0