Reputation: 1261
In RxJS (ES6) I'm trying to get the result in a sequentially series of operations in a single Observable.
I'm not sure If I have to use a forkJoin (but I would like the operations to be executed sequentially) or a concat operator (but I would like to just be notified at the end when all of them are executed).
I TRIED:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
concat
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
In both cases I can't ever see the observables coming from the HttpClient
doing their job (no http request sent), but I can see the logging.
The called method in the batch it's the following:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
I call the sync() function like the following:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
Output:
(8) calling...
sync done
but no HTTP request starting.
If I do the same outside the forkJoin/concat in the pipe map I can see the HTTP request sent.
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
What am I missing?
--- UPDATE - SOLUTION ---
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);
Upvotes: 0
Views: 3779
Reputation: 18809
Try
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
const batch: Observable<any>[] = [];
for (const product of products) {
batch.push(this.api.updateProduct(product));
}
return batch;
}),
switchMap(batch => forkJoin(batch)),
)
}
This is assuming this.db.getProducts()
is synchronous static data and not an observable/asynchronous data.
Then you can try doing
this.productService.sync().subscribe(() => { console.log('sync done'); });
And see if any API calls are being made.
Upvotes: 2
Reputation: 577
Your problem is not forkJoin/concat
here, it is using map
to return another observable.
In your code, map
would return Observable<Observable<any>>
. Your IDE should highlight this stuff(don't use any
, maybe it is a problem here).
Your solution would be to change map
to concatMap
(at least for test) and next chain should work.
Also, debug subscribe
and its return value to see what you have.
Upvotes: 0