szimek
szimek

Reputation: 6484

How to reset window duration after each value in throttle in Rx.js?

I'd like be notified when a new value is produced, but only if some time has passed since the previous value was produced.

E.g. if a source starts producing values (at t=0s) and produces new value every second for 10 seconds, then stops for 5 seconds and then starts again (at t=15s), assuming that window duration is set to less than 5 seconds, I'd like to be notified only about values at t=0 and t=15 (immediately after they are produced). Something like throttle, but where each new value resets the window duration.

Upvotes: 1

Views: 212

Answers (1)

user3743222
user3743222

Reputation: 18665

This is done in a rush but you it seems to be working : jsfiddle

The idea is to start with the first value, debounce with the specified duration, and take the next value after the debounce. source has to be shared as it will be subscribed many times over.

var ta_message = document.getElementById('ta_message');
var ta_intermediary = document.getElementById('ta_intermediary');
var ta_result = document.getElementById('ta_result');

function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

var count = 0;
var Xms = 1700;

var source$ = Rx.Observable
  .fromEvent(document.getElementById('source'), 'click')
  .map(function(){return ++count;})
 .do(emits(ta_message, 'count'))
 .share();

var firstNextItem$ = source$
  .debounce(Xms)
  .flatMapLatest(function(){return source$.take(1);})
  .do(emits(ta_intermediary, 'first value after Xms pause'))
;

var result$ = Rx.Observable.merge(source$.take(1), firstNextItem$)
      .do(emits(ta_result, 'result'))
;

result$.subscribe(function(){});

Upvotes: 1

Related Questions