RxJS and Angular HttpClient: use of forkJoin for multiple sequential requests

In RxJS (ES6) I'm trying to get the result in a sequentially series of operations in a single Observable.

I'm not sure If I have to use a forkJoin (but I would like the operations to be executed sequentially) or a concat operator (but I would like to just be notified at the end when all of them are executed).

I TRIED:

forkJoin

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return forkJoin(batch);
        })
    );
  }

concat

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return concat(batch);
        })
    );
  }

In both cases I can't ever see the observables coming from the HttpClient doing their job (no http request sent), but I can see the logging.

The called method in the batch it's the following:

  updateProduct(product: Product) {
      console.log('calling...');
      const options = this.getDefaultRequestOptions();
      return this.http.post('update_product', product, options);
  }

I call the sync() function like the following:

this.productService.sync().subscribe(() => {
  console.log('sync done');
}, error => this.utils.handleError(error));

Output:

(8) calling...
sync done

but no HTTP request starting.

If I do the same outside the forkJoin/concat in the pipe map I can see the HTTP request sent.

sync(): Observable<any> {

    const product = new Product(null, 'Title', 'Desc.', 'CODE');
    return this.api.updateProduct(product);

}

What am I missing?

--- UPDATE - SOLUTION ---

sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        flatMap(products => {

            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            console.log(batch);

            return concat(...batch);
            // return forkJoin(batch);
        })
    );

Upvotes: 0

Views: 3779

Answers (2)

AliF50
AliF50

Reputation: 18809

Try

sync(): Observable<any> {
  return from(this.db.getProducts()).pipe(
    map(products => {
      const batch: Observable<any>[] = [];
      for (const product of products) {
        batch.push(this.api.updateProduct(product));
      }
     return batch;
   }),
   switchMap(batch => forkJoin(batch)),
  )
}

This is assuming this.db.getProducts() is synchronous static data and not an observable/asynchronous data.

Then you can try doing

  this.productService.sync().subscribe(() => { console.log('sync done'); });

And see if any API calls are being made.

Upvotes: 2

andriishupta
andriishupta

Reputation: 577

Your problem is not forkJoin/concat here, it is using map to return another observable.

In your code, map would return Observable<Observable<any>>. Your IDE should highlight this stuff(don't use any, maybe it is a problem here).

Your solution would be to change map to concatMap(at least for test) and next chain should work.

Also, debug subscribe and its return value to see what you have.

Upvotes: 0

Related Questions