Reputation: 1146
Is there a nice way of creating a "buffer until changed" function? Basically, I have a sorted observable like [1,1,1,2,2,2,4,5,5]
and I want to create an observable of observables (or any collection, really) like [[1,1,1], [2,2,2], [4], [5,5]]
.
groupBy
could theoretically work in this case, but it causes huge memory consumption in my application due to it not closing the group observables immediately, which is unnecessary since I know the original observable is sorted.
Upvotes: 3
Views: 1070
Reputation: 14687
A solution that also works with immediate Observables (rxjs 5) :
source
.concat(Observable.of('~'))
.scan(({ buffer }, item) => buffer.length === 0 || buffer.includes(item) ?
({ buffer: [...buffer, item] }) :
({ buffer: [item], output: buffer }), { buffer: [] })
.pluck('output')
.filter(x => x);
http://jsbin.com/ruyozinali/1/edit?js,console
Upvotes: 2
Reputation: 23502
You could try using bufferWhen
combined with distinctUntilChanged
. Something like this:
source.bufferWhen(() => source.distinctUntilChanged())
Upvotes: 0