zeyang yue
zeyang yue

Reputation: 63

How to produce value every 1 sec on rxjs

If I already have an observable, then what operator should I use to make this observable to produce value like, every 1 sec?

// this just an example, In my project, I can't control when the 
// observable will produce value. I can assume that it will produce
// value very fast.
const obs = from([1,2,3,4,5]);

The obs will emit the value 1,2,3... very quickly. But what if I want it to emit the value every 1 sec? I mean, just make sure that obs emit value not too quick?

I checked the document from reactivex, and can't find a operator to do so. For example, delay, which just make the value production delay some time, but the relative time intervals between the values are preserved, and debounceTime do produce value periodically but ignore values at that time window.

Can someone tell me how to make the observable produce value on period time and not miss or ignore values?

Upvotes: 3

Views: 6044

Answers (3)

Steve Holgado
Steve Holgado

Reputation: 12071

You could zip it with an interval observable like this:

import { zip, from, interval } from 'rxjs'

const obs = zip(
  from([1,2,3,4,5]),
  interval(1000),
  (val, i) => val // Just emit the value
)

obs.subscribe(val => console.log(val))

If you want the first value to emit immediately then you could use timer instead of interval:

import { zip, from, timer } from 'rxjs'

const obs = zip(
  from([1,2,3,4,5]),
  timer(0, 1000),
  (val, i) => val // Just emit the value
)

obs.subscribe(val => console.log(val))

You can also use a pipe if you prefer, like this:

import { from, interval } from 'rxjs'
import { zip } from 'rxjs/operators'

const obs = from([1,2,3,4,5])
  .pipe(
    zip(interval(1000), val => val)
  )

obs.subscribe(val => console.log(val))

Update

The zip operator has been replaced with zipWith, which has no resultSelector parameter. zip Will be removed in v8.

Therefore, the example above that uses the zip operator can be updated as follows:

import { from, interval } from 'rxjs'
import { map, zipWith } from 'rxjs/operators'

const obs = from([1,2,3,4,5])
  .pipe(
    zipWith(interval(1000)),
    map(val => val[0])
  )

obs.subscribe(val => console.log(val))

Upvotes: 9

martin
martin

Reputation: 96889

You can really use delay but you have to turn each delayed value into an Observable and then concat them into sequence with concatMap.

from([1,2,3,4,5]).pipe(
  concatMap(v => of(v).pipe(delay(1000))),
)

You could use zip but that will work correctly only with test data. zip emits only when all source Observables emit the same number of items. This means that if one Observable emits very fast, then slows down and then the second Observable start emitting fast it'll emit faster than with 1s delays. This will happen even when interval and zip are combined because zip internally buffers all values so it might stack multiple emissions from interval and then reemit all of them at once if the second Observable emits very fast.

Upvotes: 2

mulla.azzi
mulla.azzi

Reputation: 2676

you can try something of this sort

var source = Rx.Observable
  .range(1, 10)
  .concatMap(function (x) {
    return Rx.Observable
      .of(x)
      .delay(1000);
  })
 .timeInterval();

Upvotes: 0

Related Questions