melloc
melloc

Reputation: 853

BufferTime with leading option

I have some events that I'd like to buffer but I'd like to buffer only after the first element.

[------bufferTime------]

Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]


Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]

is there a way to do this?

Upvotes: 2

Views: 696

Answers (4)

kraf
kraf

Reputation: 1309

I had the same problem and after playing around with it, I found this additional solution:

source$.pipe(
  buffer(source$.pipe(
    throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
    delay(10) // <-- This here bugs me like crazy though!
  )
)

Because throttle already features a leading option, you can just use it to trigger buffer emits manually.

I would really like to get rid of that delay here though. This is necessary because the inner observable is triggered first causing the buffer to emit prematurely.

Upvotes: 0

melloc
melloc

Reputation: 853

I got really good answers that enlightened my view of the problem and made me come up with the real thing that I was needing, that was something like this:

function getLeadingBufferSubject (bufferTimeArg) {
    const source = new Subject()
    const result = new Subject()

    let didOutputLeading = false

    const buffered$ = source
        .pipe(bufferTime(bufferTimeArg))
        .pipe(filter(ar => ar.length > 0))
        .pipe(map(ar => [...new Set(ar)]))

    buffered$.subscribe(v => {
        didOutputLeading = false
        const slicedArray = v.slice(1)

        // emits buffered values (except the first)  and set flag to false
        if (.length > 0) result.next(v.slice(1))
    })

    // emits first value if buffer is empty
    source.subscribe(v => {
        if (!didOutputLeading) {
            didOutputLeading = true
             result.next(v)
        }
    })

    // call .next(value) on "source"
    // subscribe for results on "result"
    return {
        source,
        result
    }
}

Upvotes: 0

SnorreDan
SnorreDan

Reputation: 2890

I think this can be solved by dividing your stream into two, firstValue$ and afterFirstValue$, and then merging them.

import { merge } from 'rxjs';
import { take, skip, bufferTime } from 'rxjs/operators';

...

firstValue$ = source$.pipe(
  take(1)
);

afterFirstValue$ = source$.pipe(
  skip(1),
  bufferTime(5000)
);

merge(firstValue$, afterFirstValue$)
  .subscribe(result => {
    // Do something
  });

Answer to follow up question concerning subject

So I have done it so that the original source is a subject here. It is not exactly how you described it, but I think maybe this is what you want.

  import { merge, Subject } from 'rxjs';
  import { take, skip, bufferTime } from 'rxjs/operators';
  import { Source } from 'webpack-sources';

  ...

  source$ = new Subject();

  firstValue$ = source$.pipe(
    take(1)
  );

  afterFirstValue$ = source$.pipe(
    skip(1),
    bufferTime(5000)
  );

  merge(firstValue$, afterFirstValue$)
    .subscribe(result => {
      // Do something
    });

  source$.next(1);
  source$.next(1);
  source$.next(1);

Upvotes: 3

martin
martin

Reputation: 96891

You can use multicast to split the stream into two and just pass the first value through.

import { concat, Subject } from “rxjs”;
import { multicast, take, bufferCount } from “rxjs/operators”;

source.pipe(
  multicast(
    new Subject(),
    s => concat(
      s.pipe(take(1)),
      s.pipe(bufferCount(X)),
    )
  ),
);

Upvotes: 2

Related Questions