schaenk
schaenk

Reputation: 672

How to subscribe to an observable as side effect?

I have a method which calls a another service to play a stream url within a player. playStream$() returns some streaming data which should be returned from the play$() method as well. As a side effect while playing a stream, i want to send a heartbeat, triggered by an interval.

play$(streamId: number, position?: number): Observable<Stream> {
  return this.playerService.playStream$(streamId, position)
    .pipe(
      tap(() => this.heartbeat$(streamId).subscribe())
    );
}

Since it's not a good practice to subscribe to another observable inside a tap() operator, i'm looking for a solution with switchMap, mergeMap oder similar. BUT: I'm not interested in the heartbeat's emitted value. I just need the Stream data, as soon playStream$() emits. The heartbeat is just a side effect, which could even fail. heartbeat$() has a takeUntil Operator who makes sure to unsubscribe from it if nessecairy.

I allready tried mergeMap and switchMap but with the effect, that the subscriber from play$() won't get the Stream data immediately.

What's the RxJS way to solve this?

Upvotes: 2

Views: 1976

Answers (1)

Barremian
Barremian

Reputation: 31105

I'm sure there are better solutions to this issue. I'd suggest the following quick solution

  1. Use combineLatestWith operator to trigger both the requests.
  2. Use startWith operater to emit a notification from the heartbeat$ observable and switchMap operator + NEVER to avoid emitting rest it's notifications. If a first value isn't emitted, combineLatestWith wouldn't emit. If remaining values are emitted, playStream$ would emit it's previous values as false positives.
  3. Use map operator to return only the playStream$ emissions.
import { timer, NEVER } from 'rxjs';
import { combineLatestWith, switchMap, takeUntil, map } from 'rxjs/operators';

private heartbeat$ = (streamId: any) = timer(0, 5000).pipe(
  switchMap((value) => 
    this.http.post('some url').pipe(
      switchMap(() => NEVER)
    )
  ),
  startWith(null),
  takeUntil(this.stopHeartbeat$)
);

play$(streamId: number, position?: number): Observable<Stream> {
  return this.playerService.playStream$(streamId, position).pipe(
    combineLatestWith(this.heartbeat$(streamId)),
    map([playStream, heartbeat] => playStream)
  );
}

Edit: Replace filter(() => !value) with startWith + switchMap + NEVER combo. If not, the subscriber would only get playStream$ emissions after the heartbeat$ has emitted it's first notification.

Edit 2: Remove interval() -> timer(). Made obsolete by the usage of startWith operator.

Upvotes: 1

Related Questions