Reputation: 25
I am using RxJS 5.5.10.
I try to throttle an observable to fire every 5 seconds.
This Observable is based on a Promise.
Rx.Observable.fromPromise(mongo.AllWishes)
.flatMap(array => Rx.Observable.from(array))
.pluck('url')
.filter(s => s !== undefined)
.subscribe(m => console.log(m))
I understand that I can use the throttle operator to emit values only after a given time
Rx.Observable.interval(1000)
.throttle(val => Rx.Observable.interval(5000)
.subscribe(m => console.log('ping'))
But when I try something like
Rx.Observable.fromPromise(mongo.AllWishes)
.throttle(val => Rx.Observable.interval(5000))
.flatMap(array => Rx.Observable.from(array))
.pluck('url')
.filter(s => s !== undefined)
.subscribe(m => console.log(m))
I get an error
rxjs/observable/PromiseObservable.js:76
root_1.root.setTimeout(function () { throw err; });
^
TypeError: this.durationSelector is not a function
What am I missing ? Thank your for your help
Upvotes: 0
Views: 708
Reputation: 5988
I'm not entirely clear on your expectations. It looks like you are getting an array from a promise and are then wanting to emit each value sequentially with 5 seconds in-between each item.
If so, I think that this should do what you want. As far as your error it is hard to tell without being able to run your code. I'm assuming it has something to do with your promise since I can replace mongo.AllWishes
with my own promise and it doesn't error.
const data = [
{ url: 'https://something.com/1' },
{ url: 'https://something.com/2' },
{ url: 'https://something.com/3' },
{ url: 'https://something.com/4' },
{ url: 'https://something.com/5' }
];
const myPromise = new Promise((resolve) => {
setTimeout(() => { resolve(data); }, 1000);
});
Rx.Observable.fromPromise(myPromise)
.flatMap(x => {
return Rx.Observable.timer(0, 1000)
.takeWhile(i => i < x.length)
.map(i => x[i]);
})
.pluck('url')
.subscribe((url) => { console.log(url); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>
Upvotes: 1