Reputation: 63
If I already have an observable, then what operator should I use to make this observable to produce value like, every 1 sec?
// this just an example, In my project, I can't control when the
// observable will produce value. I can assume that it will produce
// value very fast.
const obs = from([1,2,3,4,5]);
The obs will emit the value 1,2,3... very quickly. But what if I want it to emit the value every 1 sec? I mean, just make sure that obs emit value not too quick?
I checked the document from reactivex, and can't find a operator to do so. For example, delay, which just make the value production delay some time, but the relative time intervals between the values are preserved, and debounceTime do produce value periodically but ignore values at that time window.
Can someone tell me how to make the observable produce value on period time and not miss or ignore values?
Upvotes: 3
Views: 6044
Reputation: 12071
You could zip it with an interval observable like this:
import { zip, from, interval } from 'rxjs'
const obs = zip(
from([1,2,3,4,5]),
interval(1000),
(val, i) => val // Just emit the value
)
obs.subscribe(val => console.log(val))
If you want the first value to emit immediately then you could use timer instead of interval:
import { zip, from, timer } from 'rxjs'
const obs = zip(
from([1,2,3,4,5]),
timer(0, 1000),
(val, i) => val // Just emit the value
)
obs.subscribe(val => console.log(val))
You can also use a pipe if you prefer, like this:
import { from, interval } from 'rxjs'
import { zip } from 'rxjs/operators'
const obs = from([1,2,3,4,5])
.pipe(
zip(interval(1000), val => val)
)
obs.subscribe(val => console.log(val))
The zip
operator has been replaced with zipWith
, which has no resultSelector
parameter. zip
Will be removed in v8.
Therefore, the example above that uses the zip
operator can be updated as follows:
import { from, interval } from 'rxjs'
import { map, zipWith } from 'rxjs/operators'
const obs = from([1,2,3,4,5])
.pipe(
zipWith(interval(1000)),
map(val => val[0])
)
obs.subscribe(val => console.log(val))
Upvotes: 9
Reputation: 96889
You can really use delay
but you have to turn each delayed value into an Observable and then concat them into sequence with concatMap
.
from([1,2,3,4,5]).pipe(
concatMap(v => of(v).pipe(delay(1000))),
)
You could use zip
but that will work correctly only with test data. zip
emits only when all source Observables emit the same number of items. This means that if one Observable emits very fast, then slows down and then the second Observable start emitting fast it'll emit faster than with 1s
delays. This will happen even when interval
and zip
are combined because zip
internally buffers all values so it might stack multiple emissions from interval
and then reemit all of them at once if the second Observable emits very fast.
Upvotes: 2
Reputation: 2676
you can try something of this sort
var source = Rx.Observable
.range(1, 10)
.concatMap(function (x) {
return Rx.Observable
.of(x)
.delay(1000);
})
.timeInterval();
Upvotes: 0