Shanon Jackson
Shanon Jackson

Reputation: 6541

RXJS - Avoid inner subscription

Trying to hunt down the recipe i need but can't find it anywhere.

I have code that looks like this.

const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */

How can i build a pipeline that:

EDIT: What is the equivilent to this in a memory safe way:

Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})

Upvotes: 2

Views: 423

Answers (2)

Barremian
Barremian

Reputation: 31115

I am not exactly sure of your observables behavior, but on a general level you could use any of the RxJS higher-order mapping operators (like switchMap, concatMap etc. - differences here) to map from one observable to another. And use RxJS takeUntil operator to complete/unsubscribe from an observable based on another observable.

You could use the takeUntil to also close all open subscriptions when the component is closed.

Try the following

import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>();

Listeners.pipe(
  switchMap((personListening) => {     // <-- switch to the `DataStream` observable
    return DataStream.pipe(
      tap((data) => personListening.send(data)),   // <-- call `send()` here
      takeUntil(fromEvent(personListening, 'data:leave'))
    );
  }),
  takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook
).subscribe(
  _,                             // <-- do nothing on response
  (err) => console.log(err)      // <-- handle error
);

ngOnDestroy() {
  this.complete$.next();         // <-- close any open subscriptions
}

Upvotes: 1

MoxxiManagarm
MoxxiManagarm

Reputation: 9124

I think you wanna look on the skip and take operators of rxjs.

Example:

const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);

data is here a continous stream of emitions with a incremental number each second. start and end emit once after a defined time. In the console you will see a limited range of the data streamed.

Stackblitz: https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts

Upvotes: 0

Related Questions