Reputation: 21
For example, assume that we have stream like following
Stream 1 | -1-2-3-1-2-3--4-----------
after debounce, I would like to have the emitted stream looks like as follows:
Stream 2 | ---------------1-2-3--4------
There are lots of examples how to debounce the stream, but they take all value as the same trigger.
The following is the example code I found in reactitve-extension website,
var Rx = require('rxjs/Rx');
var times = [
{ value: 1, time: 100 },
{ value: 2, time: 200 },
{ value: 3, time: 300 },
{ value: 1, time: 400 },
{ value: 2, time: 500 },
{ value: 3, time: 600 },
{ value: 4, time: 800 }
];
// Delay each item by time and project value;
var source = Rx.Observable.from(times)
.flatMap(function (item) {
return Rx.Observable
.of(item.value)
.delay(item.time);
})
.debounceTime(500 /* ms */);
var subscription = source.subscribe(
function (x) {
console.log('Next: %s', x);
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
The console output would be
Next: 4
Completed
But I would like to get the following output
Next: 1
Next: 2
Next: 3
Next: 4
Completed
Maxime give good answer. I also try myself. Hope help someone who have the same question.
var Rx = require('rxjs/Rx');
var times = [
{ value: 1, time: 100 },
{ value: 2, time: 200 },
{ value: 3, time: 300 },
{ value: 1, time: 400 },
{ value: 2, time: 500 },
{ value: 3, time: 600 },
{ value: 4, time: 800 },
{ value: 5, time: 1500 }
];
// Delay each item by time and project value;
var source = Rx.Observable.from(times)
.flatMap(function (item) {
return Rx.Observable
.of(item.value)
.delay(item.time);
})
.do(obj => console.log('stream 1:', obj, 'at', Date.now() - startTime, `ms`))
.groupBy(obj => obj)
.flatMap(group => group.debounceTime(500))
let startTime = Date.now();
var subscription = source.subscribe(
function (x) {
console.log('stream 2: %s', x, 'at', Date.now() - startTime, 'ms');
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
The console will output
stream 1: 1 at 135 ms
stream 1: 2 at 206 ms
stream 1: 3 at 309 ms
stream 1: 1 at 409 ms
stream 1: 2 at 509 ms
stream 1: 3 at 607 ms
stream 1: 4 at 809 ms
stream 2: 1 at 911 ms
stream 2: 2 at 1015 ms
stream 2: 3 at 1109 ms
stream 2: 4 at 1310 ms
stream 1: 5 at 1510 ms
stream 2: 5 at 1512 ms
Completed
Upvotes: 1
Views: 990
Reputation: 23793
Here's the code I propose :
const { Observable } = Rx
const objs = [
{ value: 1, time: 100 },
{ value: 2, time: 200 },
{ value: 3, time: 300 },
{ value: 1, time: 400 },
{ value: 2, time: 500 },
{ value: 3, time: 600 },
{ value: 4, time: 800 }
];
const tick$ = Observable.interval(100)
const objs$ = Observable.from(objs).zip(tick$).map(x => x[0])
objs$
.groupBy(obj => obj.value)
.mergeMap(group$ =>
group$
.debounceTime(500))
.do(obj => console.log(obj))
.subscribe()
And the output is just as expected :
Here's a working Plunkr with demo https://plnkr.co/edit/rEI8odCrhp7GxmlcHglx?p=preview
Explanation :
I tried to make a small schema :
The thing is, you cannot use the debounceTime
directly on the main observable (that's why you only had one value). You've got to group every values in their own stream with the groupBy
operator and apply the debounceTime
to the splitted group of values (as I tried to show in the image). Then use flatMap
or mergeMap
to get one final stream.
Doc :
Here are some pages that might help you understand :
- groupBy
- debounceTime
- mergeMap
Upvotes: 3