Manish Gadhock
Manish Gadhock

Reputation: 163

RxJS - split the array and emit when specific count is reached

I am new to RxJS. I have specific array which I am using the Observable.

[a,b,c,d,e,f,g,h]

I want to handle the back pressure and handle 3 elements everytime.

I want to split this up like this and emit:

[a,b,c] [d,e,f] [g,h]

I am using bufferCount(3) for this. But the problem with bufferCount is this it does emit the last interval. As the last interval has only 2 elements in it.

This is my sample code

from(streamOfArr$).pipe(
         flatMap(somefunc()),
         bufferCount(3),
         tap((x) => {
                    console.log('3', x);
                }),
         flatMap(somefunc1()),

How can I emit the last interval in bufferCount.

Upvotes: 1

Views: 853

Answers (2)

MoxxiManagarm
MoxxiManagarm

Reputation: 9124

I think instead of bufferCount you might want to check mergeAll with a concurrent number. mergeAll wouldn't split the data into chunks, but it would only subscribe to the next observable, when another previous observable completed. Only n observables are hot at the same time.

function obs(n: number): Observable<number> {
  return of(n).pipe(
    tap(() => console.log('Processing: ', n)),
    delay(n * 500),
  );
}

const observables$ = [obs(1), obs(2), obs(3), obs(4), obs(5), obs(6), obs(7), obs(8), obs(9), obs(10)];

from(observables$).pipe(
  mergeAll(3),
).subscribe(console.log);

See also stackblitz: https://stackblitz.com/edit/rxjs-rcruhv?file=index.ts

With this example you can see in the console, that 3 outputs are directly printed at the start, every other processing output appears when a subscription output occured.

Upvotes: 0

Guilhermevrs
Guilhermevrs

Reputation: 2344

I believe there is another problem here.

If you see this example: https://stackblitz.com/edit/rxjs-3jwznz

import { from } from 'rxjs'; 
import { bufferCount } from 'rxjs/operators';


const source = from([1,2,3,4,5,6,7,8]).pipe(
  bufferCount(3)
);

source.subscribe(x => console.log(x));

You will see that bufferCount emits the last interval (even if the last one has only 2). Are you sure your source observable is completing?

Upvotes: 2

Related Questions