Nate
Nate

Reputation: 7876

Process unknown number of observables in RxJS pipe

I make thousands of calls to my server but in order to avoid overloading, I have put in place a concurrency limit of 10 with RxJS:

const calls = new Subject();
calls.pipe(
    mergeAll(10)
).subscribe();

while(int < unknown_number){
    calls.next(new Observable((observer) => {
        // Call the server here
    }))
}

The problem is that I don't know how many calls will be made and I need to know when the job is done. One approach is to get when nothing is left in the queue for 5 seconds or so.

How can I do that?

Upvotes: 1

Views: 256

Answers (3)

Roberto Zvjerković
Roberto Zvjerković

Reputation: 10157

Your don't really need to create your own streams, just:

Start with a subject that will call once:

const call$ = new BehaviorSubject(0);

call$.pipe(
    concatMap(call => {
        return callYourDatabase(call); // Pagination or whatever
    }),
    tap(res => {
        if (res) {
            call$.next(db); // Call again after completion if the previous has values
        }
    })
)

Upvotes: 0

After i read trough you comment of the flow, I think that you don't need merge map, as each new request will be after the previous one is completed, so i will suggest the following approach;

let {
  of,
  Subject
} = rxjs;
let {
  switchMap,
  delay,
  scan
} = rxjs.operators;

const db = [1, 2, 3, 4, 5, 6, 7];

const requester = new Subject();

const mockRequest = ({ page, items }) =>
  of(db.slice(page * items, (page + 1) * items)).pipe(delay(500));

const ITEMS_PER_PAGE = 2;
requester
  .pipe(
    switchMap(pageData => mockRequest(pageData)),
    scan(
      (acc, x) => {
        const newIdx = +acc[0] + 1;
        return [newIdx, x];
      },
      [0, []]
    )
  )
  .subscribe(([i, x]) => {
    if (x.length !== 0) {
      requester.next({ page: i, items: ITEMS_PER_PAGE });
      console.log(x);
    } else {
      console.log("Done and ready to unsubscribe");
    }
  });
requester.next({ page: 0, items: ITEMS_PER_PAGE });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.js"></script>
(You can run the code snippet directly here or here stackBlitz)

The thing I'm doing here is calling the db while there items inside of it. In your case you should just call each time for 100 elements, while there are elements returned from the db.

Upvotes: 0

Picci
Picci

Reputation: 17762

After reading your comment, I think the answer lays in issuing a complete command against the Subject as soon as we know there is no more data to be read from the DB.

So, is a sort of pseudo-code, this could be the draft for the solution

// callToServer is a function that calls the server and returns an Observable
const callToServer: Observable<any> = (input) => httpClient.post(url, input);

const calls = new Subject<any>();
calls.pipe(
    // margeMap allows to set concurrency level as second parameter
    mergeMap(input => callToServer(input), 10)
).subscribe(
   {
      next: response => {
        // do something with the response
      },
      error: err => {
        // manage error occurrences
      },
      complete: () => {
        // here you come when the Subject completes
      }
   }
);

const cursor = db.readFromMyTable();

while(cursor has still data){
    const data = cursor.getNext(100);
    // for each record issue a next against the calls Subject
    data.forEach(d => calls.next(d));
}

// when there are no more records completes the calls Subject

Upvotes: 1

Related Questions