Mark De Verno
Mark De Verno

Reputation: 428

RXJS Buffer Transform but want initial value immediately

According to the documentation, the buffer transform will wait for a delay before emitting any values. What I'd like is to get the current value immediately, then only update every X seconds.

I've not been able to achieve this with rxjs yet. The closest I've come is to bind the observable then use a setTimeout function to rebind after the buffer timeout occurs. This has a side effect of clearing the current value for those X seconds before emitting the current values.

Any ideas?

Thanks!

Upvotes: 0

Views: 837

Answers (2)

concat
concat

Reputation: 3187

Assuming by "current value immediately" you mean "first value as soon as it emits", you can buffer on the second element to the last, and merge in the first:

// source$: Observable<T>
const pub_source$ = source$.publish();
Observable.merge(
  pub_source$.take(1).map(first => [first]),
  pub_source$.skip(1).buffer(Observable.interval(X))
);
pub_source$.connect();

The source needs to be cold so that take(1) and skip(1) relate to the same element, so we use publish. The first element is also wrapped to keep the output type T[] consistent.

Upvotes: 3

Richard Matsen
Richard Matsen

Reputation: 23483

There's an easier way using the zip operator, see lightbulb note in learnrxjs

Combined with interval or timer, zip can be used to time output from another source!

// Useful for slow source that emits at around the same rate as interval
// but suffers back-pressure with fast-emitting source

const interval = 1000
const output = Observable.zip(source, Observable.timer(0, interval))
  .map(x => x[0])

The Observable.timer 'regulates' the output from source. Note, timer's first parameter sets the delay for the first emit.

Working example: CodePen

Footnote
I just realized this will create back-pressure (build-up of un-emitted values) if you have a lot of events per second, so buffer is the better way to go with a fast emitting source.

// Buffered version for fast source
const output2 = source.buffer(Observable.timer(0, interval))
  .filter(x => x.length)            // filter out empty buffer emits
  .flatMap(x => Observable.from(x)) // optional, converts array back to single emits

Upvotes: 2

Related Questions