Do side effect if observable has not emitted a value within X amount of time

I'm working on a use case that requires that if an observable has not emitted a value within a certain amount of time then we should do some side effect.

To give a practical use case:

This requires for a timer to be initiated on every emitted value and upon initial subscription of observable which will then run some function after the allotted time or until a value is emitted in which the timer resets. I'm struggling to do this the Rx way. Any help would be appreciated :)

Upvotes: 7

Views: 3554

Answers (4)

Ben Lesh
Ben Lesh

Reputation: 108471

You can do this with race:

timer(5000).race(someSource$)
  .subscribe(notifyUser);

If someSource$ notifies faster than timer(5000) (5 seconds), then someSource$ "wins" and lives on.

If you only want one value from someSource$, you can obviously have a take(1) or first() on someSource$ and that will solve that issue.

I hope that helps.

Upvotes: 7

concat
concat

Reputation: 3187

debounceTime is the operator you're looking for: it only emits a value if no others follow within a specific timeout. Listening for the first message of the debounced stream will let you time out and clean up your websocket connection. If you need to time out starting from the opening of the stream, you can simply startWith. Concretely:

messages$.startWith(null)
         .debounceTime(timeout)
         .take(1)
         .subscribe(() => { /* side effects */ });

Edit: if instead you're looking to end the a message stream entirely when it times out (e.g. you clean up in the onComplete handler), just cram debounceTime into a takeUntil:

messages$.takeUntil(
  messages$.startWith(null)
           .debounceTime(timeout)
).subscribe(timeout_observer);

With a timeout_observable: Observer<TMessage> that contains your cleanup onComplete.

Upvotes: 8

John
John

Reputation: 4601

A timer is initiated on each element and if it takes 4 seconds to be shown, then it will timeout and you can execute your function in the catchError

Here an example, it displays aa at T0s, then bb at t3s, then timeout after 4 second because the last one cc takes 10s to be displayed

import './style.css';
screenLog.init()

import { from } from 'rxjs/observable/from';
import { of } from 'rxjs/observable/of';
import { race } from 'rxjs/observable/race';
import { timer } from 'rxjs/observable/timer';
import { groupBy, mergeMap, toArray, map, reduce, concatMap, delay, concat, timeout, catchError, take } from 'rxjs/operators';

// simulate a element that appear at t0, then at t30s, then at t10s
const obs1$ = of('aa ');
const obs2$ = of('bb ').pipe(delay(3000));
const obs3$ = of('cc ').pipe(delay(10000));


const example2 = obs1$.pipe(concat(obs2$.pipe(concat(obs3$))), timeout(4000), catchError(a => of('timeout'))); // here in the catchError, execute your function

const subscribe = example2.subscribe(val => console.log(val + ' ' + new Date().toLocaleTimeString())); 

Upvotes: 0

Fan Cheung
Fan Cheung

Reputation: 11345

Might not be the perfect answer but it does what you asked, it depends on how you want to disconnect, there might be some variation to be done

const source = new Rx.Subject();
const duration = 2000;

source.switchMap(value=>{
  return Rx.Observable.of(value).combineLatest(Rx.Observable.timer(2000).mapTo('disconnect').startWith('connected'))
}).flatMap(([emit,timer])=>{
  if(timer=='disconnect'){
    console.log('go disconnect')
  return Rx.Observable.throw('disconnected')
  }
  return Rx.Observable.of(emit)
})
//.catch(e=>Rx.Observable.of('disconnect catch'))
.subscribe(value=>console.log('subscribed->',value),console.log)

setTimeout(() => source.next('normal'), 300);
setTimeout(() => source.next('normal'), 300);
setTimeout(() => source.next('last'), 1800);
setTimeout(() => source.next('ignored'), 4000);
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Upvotes: 0

Related Questions