Christopher Jakob
Christopher Jakob

Reputation: 493

how to take an action after takeUntil rxjs function

So I am trying to have a boolean be true if the function running runs long enough to trigger the takenUntil function which is running on a timer.

Here is the code

start = this.http.get(environment.shochat_content_creator_set_valid_stream_start).pipe(
    tap(() => console.log('Stream start'))
  );

  poll = this.http.get(environment.check_if_stream_is_active_on_mux).pipe(
    tap(() => {
        this.streamready = true;
        return 0;
      }
      ),
    catchError(error => {
      console.log(error);
      return EMPTY;
    })
  );

  startastream(){
    const endtimer = timer(60000);
    this.streampollsubscription = this.start.pipe(
      switchMap(() => timer(0, 5000).pipe(
        tap(() => console.log('Polling every 5s')),
        mergeMap(() => this.poll)
      )),
      takeUntil(endtimer)
    ).subscribe();

  }

essentially i want a boolean to be set to true if the takeUntil does get fired.

timeout = true;

I have been looking at this stackoverflow post

Do some action after takeUntil

but things are not as clear as I want it to be.

Upvotes: 4

Views: 4489

Answers (2)

Jonathan Stellwag
Jonathan Stellwag

Reputation: 4287

You can use the merge operator and reuse your takeUntil condition to create a mapped true value when the condition (endtimer) fires:

const { Subject, merge } = rxjs;
const { takeUntil, mapTo } = rxjs.operators;

const source$ = new Subject();
const condition$ = new Subject();

// Use the merge to work around your completed observable
const result$ = merge(
  // When your condition fires map this event to true
  condition$.pipe(mapTo(true)),
  // Your already existing pipe in wich the takeUntil lives
  source$.pipe(
    takeUntil(condition$)
  )
)

result$.subscribe(console.log)

source$.next(1);
source$.next(2);
condition$.next();
source$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

FYI: I am not sure if this solution works for you in the context of your application as you did not show where timeout is being declared and set.

Upvotes: 3

BizzyBob
BizzyBob

Reputation: 14750

takeUntil completes the observable, so to perform an action upon completion there are a couple places you can do that:

  1. in the completion handler:
this.start.pipe(
    switchMap(() => timer(0, 5000).pipe(
        tap(() => console.log('Polling every 5s')),
        mergeMap(() => this.poll))
    ),
    takeUntil(endtimer)
)
.subscribe(
    next  => console.log('handling next value'),
    error => console.log('handling error'),
    ()    => this.timeout = true    // <--- this fires after observable completes
);
  1. using the finalize operator:
this.start.pipe(
    switchMap(() => timer(0, 5000).pipe(
        tap(() => console.log('Polling every 5s')),
        mergeMap(() => this.poll))
    ),
    takeUntil(endtimer),
    finalize(() => this.timeout = true)
)
.subscribe();

Note: these solutions are not exactly what you are asking for. It's true they will fire when takeUntil fires, but they will also fire for any other reason the stream completes. I don't think that distinction matters in your case, but wanted to mention in in the context of the overall question.

As Sang Dang mentioned in the comments, you could also approach it from the angle of "when the timer goes off (rather than what I've mentioned thus far, "when the observable completes"), which you can accomplish by simply adding a tap to your timer.

const endtimer = timer(60000).pipe(
    tap(() => this.timeout = true)
);

Upvotes: 3

Related Questions