Reputation: 821
I am working with Angular and NodeJs (with Axios) using RxJs, and currently find it very challenging to achieve this task. I will explain my problem based on a scenario.
I have an array of objects like this, with possibly even more than 100 objects:
let objArr = [{name: 'john', id: '123', country: 'usa'}, {name: 'doe', id: '456', country: 'china'}....]
Then I have another 4-5 validation APIs that can be called for different params e.g. id, name and country based on each object:
api_1 = "validate_step_1/:id"
api_2 = "validate_step_2/:id/:name"
api_3 = "validate_step_3/:id/:country"
api_4 = "validate_step_4/:id:/:name/:country"
These API calls should strictly happen one after another in a sequential pattern, e.g. api_2 should only be called if api_1 returns true and so on.
What I want:
I would like to execute for-loop on the array that should run in parallel, and each object should then sequentially validate itself based on these 4 API calls. Something like sequential API calls based on each item in for-loop in parallel for all 100 objects.
Is this even possible? Also, any solutions to achieve this on the Node side are also welcomed.
What I tried
Right now I am using this method, but it's very slow, even sometimes resulting in timeout errors in Axios:
of(...this.objArr).pipe(
concatMap((obj: any) => this.service.api_1(id)),
concatMap((obj: any) => this.service.api_2(id, name)),
concatMap((obj: any) => this.service.api_3(id, country)),
concatMap((obj: any) => this.service.api_4(id, name, country)),
catchError( error => this.handleError(error))
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
)
Upvotes: 1
Views: 3274
Reputation: 629
Switch map your data array into a stream of parallel pipes that join together.
const dataArray: myData[];
of(dataArray).pipe(
// switch and listen to the returned pipe
switchMap(arrayOfObjects => {
// a normal map (not an RxJS map), returns an array of pipes
const nParallelPipes = arrayOfObjects.map(data => {
const test1 = this.service.api_1(data.id);
const test2 = this.service.api_2(data.id, data.name);
const test3 = this.service.api_3(data.id, data.country);
const test4 = this.service.api_4(data.id, data.name, data.country);
return concat(test1, test2, test3, test4).pipe(
catchError(error => this.handleError(error))
)
});
// listen to all pipes in parallel, and only return when all pipes have completed
return forkJoin(nParallelPipes) // or combineLatest(nParallelPipes).pipe(take(1))
})
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
);
nParallelPipes
contains n synchronous sequential validation pipes, where n is the amount of data in your initial array. ForkJoining them together (or combineLatest) will fire them off in parallel.
I hope that helps.
Happy coding.
Is it possible to apply certain conditional checks on the response of API before moving to the next?
Yes. It is possible.
const dataArray: string[];
const checkTest1 = (value) => true;
const checkTest2 = (value) => true;
const checkTest3 = (value) => true;
const checkTest4 = (value) => true;
of(dataArray).pipe(
// switch and listen to the returned pipe
switchMap(arrayOfObjects => {
// a normal map (not an RxJS map), returns an array of pipes
const nParallelPipes = (arrayOfObjects as string[]).map(data => {
const test1 = this.service.api_1(data.id);
const test2 = this.service.api_2(data.id, data.name);
const test3 = this.service.api_3(data.id, data.country);
const test4 = this.service.api_4(data.id, data.name, data.country);
const startTesting = concatMap(value => iif(() => checkTest1(value), test1Passed, throwError('T1 Failed')))
const test1Passed = test2.pipe(concatMap(value => iif(() => checkTest2(value), test2Passed, throwError('T2 Failed'))));
const test2Passed = test3.pipe(concatMap(value => iif(() => checkTest3(value), test3Passed, throwError('T3 Failed'))));
const test3Passed = test4.pipe(concatMap(value => iif(() => checkTest4(value), of('success'), throwError('T4 Failed'))))
return test1.pipe(startTesting);
});
// listen to all pipes in parallel, and only return when all pipes have completed
return forkJoin(nParallelPipes) // or combineLatest(parallelCalls).pipe(take(1))
})
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
);
Upvotes: 2
Reputation: 17762
I would adopt this approach.
First of all create a function that build an Observable that runs all the validations in parallel and handle any error we may encounter (more on errors later). Something like this
function validate(id, name, country) {
return concat(
this.service.api1(id),
this.service.api2(id, name),
this.service.api3(id, country),
this.service.api4(id, name, country)
).pipe(
catchError(err => of(err.message))
);
}
Then I would use the from
function from the rxjs library to turn the array of objects into a stream and then apply the mergeMap
operator to launch all the validations in parallel, like this
from(objArr)
.pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country)))
.subscribe((v) => console.log(v));
Why use mergeMap
over forkJoin
in this case. The main reason is that with mergeMap
you can control the level of concurrency you want to have. If you do not specify anything, all the validations run in parallel. But, for instance, if you want to limit the number of parallel validations, you can use the optional concurrent
parameter of mergeMap
like this
const concurrent = 3 // whatever limit
from(objArr)
.pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country), concurrent))
.subscribe((v) => console.log(v));
If you want to proceed in the validations only if a certain criteria is met, you can simply use the tap
operator and throw
an error if the condition is not met. The error will be caught by the catchError
operator which we have added at the end of the pipe
in the validate
function. So, a validation API would look like this
api3(id, country) {
return this.invokeApi3(id, country).pipe(
tap(() => {
if (// check the criteria for which you do not want to continue) {
throw new Error('Error in API 3');
}
})
);
}
You can look at this stackblitz for an examp.le
Upvotes: 1
Reputation: 1
you can use forkJoin
operator to call multiple endpoints in case you're only interested in the final response of all the endpoints and no more future emissions, it takes an array of observables and gives tou an array of responses then completes...something like this:
const obs1$ = this.service.api1;
const obs2$ = this.service.api2;
const subscription = forkJoin([obs1$,obs2$].subscribe(([response1, response2]) => console.log(response1, response2))
but in case you'll be expecting more emissions from one or more observables you can use combineLatest
or preferably zip
operator.
Upvotes: 0