Jeff Nien
Jeff Nien

Reputation: 21

How to get throttled value in rxjs?

From rxjs website throttle example,

The emitted sequence is 0 4 2 1 3.

The output sequence is 0 1. (Because 4, 2 and 1 was dropped)

var Rx = require('rxjs/Rx');
var times = [
  { value: 0, time: 100 },
  { value: 1, time: 600 },
  { value: 2, time: 400 },
  { value: 3, time: 900 },
  { value: 4, time: 200 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
  .flatMap(function (item) {
    return Rx.Observable
      .of(item.value)
      .delay(item.time);
  })
  .throttleTime(300 /* ms */);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

The console will output

Next: 0 (at 100ms) // The value 4 was dropped(at 200ms)
                   // The value 2 was dropped(at 400ms)
Next: 1 (at 600ms)
                   // The value 3 was dropped(at 900ms)
Completed

But, is it possible to get the dropped value stream?

Next: 4 (at 200ms)
Next: 2 (at 400ms)
Next: 3 (at 900ms)
Completed

Upvotes: 1

Views: 77

Answers (1)

thatseeyou
thatseeyou

Reputation: 2012

  1. Attach index to source.
  2. Combine latest index value of source and throttled.
  3. Compare index of source and throttled. If source index > throttled index then source is not throttled.
  4. Remove attated index.

You can use this technique in another NOT case.

var Rx = require('rxjs/Rx');
var times = [
    { value: 0, time: 100 },
    { value: 1, time: 600 },
    { value: 2, time: 400 },
    { value: 3, time: 900 },
    { value: 4, time: 200 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
    .mergeMap(function (item) {
        return Rx.Observable
            .of(item.value)
            .delay(item.time);
    });

var indexedSource = source
    .scan((_, value, index) => {
        // console.log(`value = ${value}, index = ${index}`)
        return [value, index];
    }, undefined)
    .share();

var indexedThrottled = indexedSource
    .throttleTime(300 /* ms */);

var throttled = indexedThrottled
    .map(value => value[0]);

var notThrottled = Rx.Observable.combineLatest(indexedThrottled, indexedSource)
    .filter(combined => {
        var filteredIndex = combined[0][1];
        var sourceIndex = combined[1][1];

        return sourceIndex > filteredIndex ? true : false;
    })
    .map(combined => {
        return combined[1][0];
    });

source.subscribe(value => console.log(`source : ${value}`));
throttled.subscribe(value => console.log(`++++++ : ${value}`));
notThrottled.subscribe(value => console.log(`------ : ${value}`));

Upvotes: 1

Related Questions