MJ_Wales
MJ_Wales

Reputation: 893

rxjs - buffer stream until function returns true

I have a stream of numbers which increase by a constant amount which I want to sub-sample. Given a constant sample interval, I want to buffer the stream until the difference between the first and last buffered value are greater or equal to interval. It would then emit this array, so similar to the buffer operator.

I've searched through the different rxjs operators but can't figure out how to make this work. A bufferUntil operator would be perfect but doesn't seem to exist. How can I implement this?

For example:

const interval = 15;
//example stream would be: 5, 10 , 15, 20, 25, 30..

Observable.pipe(
   bufferUntil(bufferedArray => {
       let last = bufferedArray.length - 1;
       return (bufferedArray[last] - bufferedArray[0] >= interval);
   })
).subscribe(x => console.log(x));

//With an expected output of [5, 10, 15, 20], [ 25, 30, 35, 40],..

Upvotes: 5

Views: 799

Answers (1)

frido
frido

Reputation: 14139

You can implement an operator that maintains a custom buffer. Push all incoming values into the buffer, emit the buffer when it meets a given condition and reset it. Use defer to supply every subscriber with its own buffer.

function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => defer(() => {
    let buffer: T[] = []; // custom buffer
    return source.pipe(
      tap(v => buffer.push(v)), // add values to buffer
      switchMap(() => emitWhen(buffer) ? of(buffer) : EMPTY), // emit the buffer when the condition is met
      tap(() => buffer = []) // clear the buffer
    )
  });
}

https://stackblitz.com/edit/rxjs-7awqmv

Note that the above code will not emit any remaining items once the source completes. The following remedies that:

function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => defer(() => {
    const buffer: T[] = [];
    return source.pipe(
      tap(v => buffer.push(v)), // add values to buffer
      switchMap(() => emitWhen(buffer) ? of([...buffer]) : EMPTY),
      tap(() => buffer.splice(0, buffer.length)),
      endsWith(buffer)
    )
  });
}

Upvotes: 5

Related Questions