Jeff Nien
Jeff Nien

Reputation: 21

How to use debounce stream based on value?

For example, assume that we have stream like following

Stream 1 | -1-2-3-1-2-3--4-----------

after debounce, I would like to have the emitted stream looks like as follows:

Stream 2 | ---------------1-2-3--4------

There are lots of examples how to debounce the stream, but they take all value as the same trigger.

The following is the example code I found in reactitve-extension website,

var Rx = require('rxjs/Rx');
var times = [
    { value: 1, time: 100 },
    { value: 2, time: 200 },
    { value: 3, time: 300 },
    { value: 1, time: 400 },
    { value: 2, time: 500 },
    { value: 3, time: 600 },
    { value: 4, time: 800 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
  .flatMap(function (item) {
    return Rx.Observable
      .of(item.value)
      .delay(item.time);
  })
  .debounceTime(500 /* ms */);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

The console output would be

Next: 4
Completed

But I would like to get the following output

Next: 1
Next: 2
Next: 3
Next: 4
Completed

Maxime give good answer. I also try myself. Hope help someone who have the same question.

var Rx = require('rxjs/Rx');
var times = [
    { value: 1, time: 100 },
    { value: 2, time: 200 },
    { value: 3, time: 300 },
    { value: 1, time: 400 },
    { value: 2, time: 500 },
    { value: 3, time: 600 },
    { value: 4, time: 800 },
    { value: 5, time: 1500 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
  .flatMap(function (item) {
    return Rx.Observable
      .of(item.value)
      .delay(item.time);
  })
  .do(obj => console.log('stream 1:', obj, 'at', Date.now() - startTime, `ms`))
  .groupBy(obj => obj)
  .flatMap(group => group.debounceTime(500))

let startTime = Date.now();
var subscription = source.subscribe(
  function (x) {
    console.log('stream 2: %s', x, 'at', Date.now() - startTime, 'ms');
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

The console will output

stream 1: 1 at 135 ms
stream 1: 2 at 206 ms
stream 1: 3 at 309 ms
stream 1: 1 at 409 ms
stream 1: 2 at 509 ms
stream 1: 3 at 607 ms
stream 1: 4 at 809 ms
stream 2: 1 at 911 ms
stream 2: 2 at 1015 ms
stream 2: 3 at 1109 ms
stream 2: 4 at 1310 ms
stream 1: 5 at 1510 ms
stream 2: 5 at 1512 ms
Completed

Upvotes: 1

Views: 990

Answers (1)

maxime1992
maxime1992

Reputation: 23793

Here's the code I propose :

const { Observable } = Rx

const objs = [
  { value: 1, time: 100 },
  { value: 2, time: 200 },
  { value: 3, time: 300 },
  { value: 1, time: 400 },
  { value: 2, time: 500 },
  { value: 3, time: 600 },
  { value: 4, time: 800 }
];

const tick$ = Observable.interval(100)

const objs$ = Observable.from(objs).zip(tick$).map(x => x[0])

objs$
  .groupBy(obj => obj.value)
  .mergeMap(group$ => 
    group$
     .debounceTime(500))
  .do(obj => console.log(obj))
  .subscribe()

And the output is just as expected :

enter image description here

Here's a working Plunkr with demo https://plnkr.co/edit/rEI8odCrhp7GxmlcHglx?p=preview


Explanation :

I tried to make a small schema :enter image description here

The thing is, you cannot use the debounceTime directly on the main observable (that's why you only had one value). You've got to group every values in their own stream with the groupBy operator and apply the debounceTime to the splitted group of values (as I tried to show in the image). Then use flatMap or mergeMap to get one final stream.

Doc :
Here are some pages that might help you understand :
- groupBy
- debounceTime
- mergeMap

Upvotes: 3

Related Questions