Mike Pedersen
Mike Pedersen

Reputation: 1146

RxJS buffer until changed

Is there a nice way of creating a "buffer until changed" function? Basically, I have a sorted observable like [1,1,1,2,2,2,4,5,5] and I want to create an observable of observables (or any collection, really) like [[1,1,1], [2,2,2], [4], [5,5]].

groupBy could theoretically work in this case, but it causes huge memory consumption in my application due to it not closing the group observables immediately, which is unnecessary since I know the original observable is sorted.

Upvotes: 3

Views: 1070

Answers (2)

ZahiC
ZahiC

Reputation: 14687

A solution that also works with immediate Observables (rxjs 5) :

source
  .concat(Observable.of('~'))
  .scan(({ buffer }, item) => buffer.length === 0 || buffer.includes(item) ? 
        ({ buffer: [...buffer, item] }) : 
        ({ buffer: [item], output: buffer }), { buffer: [] })
  .pluck('output')
  .filter(x => x);

http://jsbin.com/ruyozinali/1/edit?js,console

Upvotes: 2

Luka Jacobowitz
Luka Jacobowitz

Reputation: 23502

You could try using bufferWhen combined with distinctUntilChanged. Something like this:

source.bufferWhen(() => source.distinctUntilChanged())

Upvotes: 0

Related Questions