glen4096
glen4096

Reputation: 874

RxJs observable to listen for 10 seconds, return only 5 of the received values, discard the rest, and continue listening?

I have an observable that will be taking in multiple real-time trade values (possibly many per second) from a SignalR hub. What I am trying to achieve is an observable that continuously (every 10 seconds) outputs a sample of 5 trades that occurred in those last 10 seconds.

I wrote an observable pipe to try to achieve this by adding all of the received trades into a buffer for 10 seconds, then creating an observable for each of the trades in the buffer array, using 'concatMap' and 'from'. Then, creating another buffer that collects 5 values, and emits them.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

However, the pipe keeps emitting all of the values that it receives in the 10 second window, but in groups of 5. I tried adding a take(5)in the pipe after the concat map, and it works correctly for the first batch of 5 values, but then the observable "completes" and stops listening for new values. I also tried adding a filter with index after the concatMap like this:

filter((v, i) => (i < 6 )),

This works for the first batch of 5 values, but then keeps filtering out every value, so a second buffer of 5 never gets created. Also this use case of the filter appears to be deprecated.

I'm not sure if I'm overlooking something obvious here, but I've looked at many of the rxjs operators and can't find a way to achieve this

Upvotes: 0

Views: 1914

Answers (3)

Mrk Sef
Mrk Sef

Reputation: 8022

Sounds like all you need is bufferTime. You can decide what to keep and what to throw away afterward.

this.bufferedTradeObservable$ = this.tradeReceived.pipe(
  // Buffer for 1 seconds
  bufferTime(10000),
  // Only emit the last 5 values from the buffer.
  map(buffer => buffer.slice(-5))
);

Upvotes: 3

Steve Trout
Steve Trout

Reputation: 9319

bufferTime has a maxBufferSize argument that will do this for you.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

You could also use windowTime instead to output each value as soon as it's created, rather than waiting for all 5.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

These are covered in the documentation for bufferTime and windowTime respectively.

Upvotes: 2

David Kidwell
David Kidwell

Reputation: 720

What about something like this,

let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);

Upvotes: 2

Related Questions