Reputation: 163
I am new to RxJS. I have specific array which I am using the Observable.
[a,b,c,d,e,f,g,h]
I want to handle the back pressure and handle 3 elements everytime.
I want to split this up like this and emit:
[a,b,c] [d,e,f] [g,h]
I am using bufferCount(3) for this. But the problem with bufferCount is this it does emit the last interval. As the last interval has only 2 elements in it.
This is my sample code
from(streamOfArr$).pipe(
flatMap(somefunc()),
bufferCount(3),
tap((x) => {
console.log('3', x);
}),
flatMap(somefunc1()),
How can I emit the last interval in bufferCount.
Upvotes: 1
Views: 853
Reputation: 9124
I think instead of bufferCount
you might want to check mergeAll
with a concurrent number. mergeAll
wouldn't split the data into chunks, but it would only subscribe to the next observable, when another previous observable completed. Only n observables are hot at the same time.
function obs(n: number): Observable<number> {
return of(n).pipe(
tap(() => console.log('Processing: ', n)),
delay(n * 500),
);
}
const observables$ = [obs(1), obs(2), obs(3), obs(4), obs(5), obs(6), obs(7), obs(8), obs(9), obs(10)];
from(observables$).pipe(
mergeAll(3),
).subscribe(console.log);
See also stackblitz: https://stackblitz.com/edit/rxjs-rcruhv?file=index.ts
With this example you can see in the console, that 3 outputs are directly printed at the start, every other processing output appears when a subscription output occured.
Upvotes: 0
Reputation: 2344
I believe there is another problem here.
If you see this example: https://stackblitz.com/edit/rxjs-3jwznz
import { from } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
const source = from([1,2,3,4,5,6,7,8]).pipe(
bufferCount(3)
);
source.subscribe(x => console.log(x));
You will see that bufferCount emits the last interval (even if the last one has only 2). Are you sure your source observable is completing?
Upvotes: 2