CptNapalm
CptNapalm

Reputation: 25

RXJS: How to throttle Observable from Promise

I am using RxJS 5.5.10.

I try to throttle an observable to fire every 5 seconds.

This Observable is based on a Promise.

  Rx.Observable.fromPromise(mongo.AllWishes)
    .flatMap(array => Rx.Observable.from(array))
    .pluck('url')
    .filter(s => s !== undefined)
    .subscribe(m => console.log(m))

I understand that I can use the throttle operator to emit values only after a given time

Rx.Observable.interval(1000)
  .throttle(val => Rx.Observable.interval(5000)
  .subscribe(m => console.log('ping'))

But when I try something like

  Rx.Observable.fromPromise(mongo.AllWishes)
    .throttle(val => Rx.Observable.interval(5000))
    .flatMap(array => Rx.Observable.from(array))
    .pluck('url')
    .filter(s => s !== undefined)
    .subscribe(m => console.log(m))

I get an error

rxjs/observable/PromiseObservable.js:76
                    root_1.root.setTimeout(function () { throw err; });
                                                         ^    
TypeError: this.durationSelector is not a function

What am I missing ? Thank your for your help

Upvotes: 0

Views: 708

Answers (1)

bygrace
bygrace

Reputation: 5988

I'm not entirely clear on your expectations. It looks like you are getting an array from a promise and are then wanting to emit each value sequentially with 5 seconds in-between each item.

If so, I think that this should do what you want. As far as your error it is hard to tell without being able to run your code. I'm assuming it has something to do with your promise since I can replace mongo.AllWishes with my own promise and it doesn't error.

const data = [
  { url: 'https://something.com/1' },
  { url: 'https://something.com/2' },
  { url: 'https://something.com/3' },
  { url: 'https://something.com/4' },
  { url: 'https://something.com/5' }
];
const myPromise = new Promise((resolve) => {
	setTimeout(() => { resolve(data); }, 1000);
});


Rx.Observable.fromPromise(myPromise)
	.flatMap(x => {
  	return Rx.Observable.timer(0, 1000)
  		.takeWhile(i => i < x.length)
  		.map(i => x[i]);
	})
	.pluck('url')
  .subscribe((url) => { console.log(url); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>

Upvotes: 1

Related Questions