charlie_pl
charlie_pl

Reputation: 3094

Observable event limit/keep only one event in memory/backpressure

I have subscribed to timer that produces event every n seconds

Observable.interval(1000) .startWith(0) .map( x => { return 'USER'; }

i have also other observable, that produces results that are not available from the very beggining, it takes some time to resolve. The timeout events accumulate, and when the other event finally fires up I have a flood of requests.

 .zip(tokenService.token, (privilege: string, token: Token) => { 
       /*get request body*/ }
 .flatMap((body: any) => { /* perform refresh request */ }
 .map( x => { return x.json(); })
 .subscribe( json => {
       let token = json['token']'
       tokenService.setToken(token);
 });

Is there a way to keep only one last event from timer, and discard the rest?

.last() does not work for me, because it does return only one event, but then it returns nothing, i don't see next timeout events.

Maybe it is not a good angle for my problem? I want to refresh token every n seconds, and do that only if I have valid token on my hand (right now service providing Observable<Token>)

Edit: Ok, I found out this is called backpressure, and there is an article about it: https://github.com/ReactiveX/RxJava/wiki/Backpressure

Question still stands though.

Upvotes: 3

Views: 727

Answers (1)

dotcs
dotcs

Reputation: 2296

You basically want to re-trigger an event based on the current state of an Observable. In case the token is valid and some time has passed a new token should be created, otherwise nothing should happen.

See the following example on jsbin that provides some sample code of how this can be done. Use the button to create a new valid token. From then on every second a new token will be generated (which is valid for five seconds). Once you invalidate the token via the invalidate button the generation of new tokens will stop.

function createToken() {
  // Here you would do some (async) things to get a valid token.
  return Promise.resolve({ validUntil: new Date(+new Date() + 5000) });
}

function isTokenValid(token) {
  const date = new Date();
  return token.validUntil > date;
}

// Subject holds copy of latest token.
const token$ = new Rx.ReplaySubject(1);

// Defines the time interval to periodically query new tokens.
const period$ = Rx.Observable.interval(1000);

period$
  .withLatestFrom(token$, (p, token) => {
    if (isTokenValid(token)) {
      createToken()
        .then(token => token$.next(token));
    }
    return token;
  })
  .filter(token => isTokenValid(token))
  .subscribe(x => console.log(x));

// Button stream that invalidates token
const invalidateBtn$ = Rx.Observable.fromEvent(
  document.getElementById('invalidateBtn'), 'click')
  .subscribe(() => {
    token$.next({ validUntil: new Date(0) });
  });

// Button stream triggers creation of first valid token
const createBtn$ = Rx.Observable.fromEvent(
  document.getElementById('createBtn'), 'click')
  .subscribe(() => {
    createToken()
      .then((token) => token$.next(token));
  });

Upvotes: 1

Related Questions