Reputation: 7876
I make thousands of calls to my server but in order to avoid overloading, I have put in place a concurrency limit of 10 with RxJS:
const calls = new Subject();
calls.pipe(
mergeAll(10)
).subscribe();
while(int < unknown_number){
calls.next(new Observable((observer) => {
// Call the server here
}))
}
The problem is that I don't know how many calls will be made and I need to know when the job is done. One approach is to get when nothing is left in the queue for 5 seconds or so.
How can I do that?
Upvotes: 1
Views: 256
Reputation: 10157
Your don't really need to create your own streams, just:
Start with a subject that will call once:
const call$ = new BehaviorSubject(0);
call$.pipe(
concatMap(call => {
return callYourDatabase(call); // Pagination or whatever
}),
tap(res => {
if (res) {
call$.next(db); // Call again after completion if the previous has values
}
})
)
Upvotes: 0
Reputation: 3588
After i read trough you comment of the flow, I think that you don't need merge map, as each new request will be after the previous one is completed, so i will suggest the following approach;
let {
of,
Subject
} = rxjs;
let {
switchMap,
delay,
scan
} = rxjs.operators;
const db = [1, 2, 3, 4, 5, 6, 7];
const requester = new Subject();
const mockRequest = ({ page, items }) =>
of(db.slice(page * items, (page + 1) * items)).pipe(delay(500));
const ITEMS_PER_PAGE = 2;
requester
.pipe(
switchMap(pageData => mockRequest(pageData)),
scan(
(acc, x) => {
const newIdx = +acc[0] + 1;
return [newIdx, x];
},
[0, []]
)
)
.subscribe(([i, x]) => {
if (x.length !== 0) {
requester.next({ page: i, items: ITEMS_PER_PAGE });
console.log(x);
} else {
console.log("Done and ready to unsubscribe");
}
});
requester.next({ page: 0, items: ITEMS_PER_PAGE });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.js"></script>
The thing I'm doing here is calling the db while there items inside of it. In your case you should just call each time for 100 elements, while there are elements returned from the db.
Upvotes: 0
Reputation: 17762
After reading your comment, I think the answer lays in issuing a complete
command against the Subject
as soon as we know there is no more data to be read from the DB.
So, is a sort of pseudo-code, this could be the draft for the solution
// callToServer is a function that calls the server and returns an Observable
const callToServer: Observable<any> = (input) => httpClient.post(url, input);
const calls = new Subject<any>();
calls.pipe(
// margeMap allows to set concurrency level as second parameter
mergeMap(input => callToServer(input), 10)
).subscribe(
{
next: response => {
// do something with the response
},
error: err => {
// manage error occurrences
},
complete: () => {
// here you come when the Subject completes
}
}
);
const cursor = db.readFromMyTable();
while(cursor has still data){
const data = cursor.getNext(100);
// for each record issue a next against the calls Subject
data.forEach(d => calls.next(d));
}
// when there are no more records completes the calls Subject
Upvotes: 1