Reputation: 853
I have some events that I'd like to buffer but I'd like to buffer only after the first element.
[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]
Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]
is there a way to do this?
Upvotes: 2
Views: 696
Reputation: 1309
I had the same problem and after playing around with it, I found this additional solution:
source$.pipe(
buffer(source$.pipe(
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
delay(10) // <-- This here bugs me like crazy though!
)
)
Because throttle
already features a leading
option, you can just use it to trigger buffer emits manually.
I would really like to get rid of that delay
here though. This is necessary because the inner observable is triggered first causing the buffer to emit prematurely.
Upvotes: 0
Reputation: 853
I got really good answers that enlightened my view of the problem and made me come up with the real thing that I was needing, that was something like this:
function getLeadingBufferSubject (bufferTimeArg) {
const source = new Subject()
const result = new Subject()
let didOutputLeading = false
const buffered$ = source
.pipe(bufferTime(bufferTimeArg))
.pipe(filter(ar => ar.length > 0))
.pipe(map(ar => [...new Set(ar)]))
buffered$.subscribe(v => {
didOutputLeading = false
const slicedArray = v.slice(1)
// emits buffered values (except the first) and set flag to false
if (.length > 0) result.next(v.slice(1))
})
// emits first value if buffer is empty
source.subscribe(v => {
if (!didOutputLeading) {
didOutputLeading = true
result.next(v)
}
})
// call .next(value) on "source"
// subscribe for results on "result"
return {
source,
result
}
}
Upvotes: 0
Reputation: 2890
I think this can be solved by dividing your stream into two, firstValue$ and afterFirstValue$, and then merging them.
import { merge } from 'rxjs';
import { take, skip, bufferTime } from 'rxjs/operators';
...
firstValue$ = source$.pipe(
take(1)
);
afterFirstValue$ = source$.pipe(
skip(1),
bufferTime(5000)
);
merge(firstValue$, afterFirstValue$)
.subscribe(result => {
// Do something
});
So I have done it so that the original source is a subject here. It is not exactly how you described it, but I think maybe this is what you want.
import { merge, Subject } from 'rxjs';
import { take, skip, bufferTime } from 'rxjs/operators';
import { Source } from 'webpack-sources';
...
source$ = new Subject();
firstValue$ = source$.pipe(
take(1)
);
afterFirstValue$ = source$.pipe(
skip(1),
bufferTime(5000)
);
merge(firstValue$, afterFirstValue$)
.subscribe(result => {
// Do something
});
source$.next(1);
source$.next(1);
source$.next(1);
Upvotes: 3
Reputation: 96891
You can use multicast
to split the stream into two and just pass the first value through.
import { concat, Subject } from “rxjs”;
import { multicast, take, bufferCount } from “rxjs/operators”;
source.pipe(
multicast(
new Subject(),
s => concat(
s.pipe(take(1)),
s.pipe(bufferCount(X)),
)
),
);
Upvotes: 2