Reputation: 1063
I have an array with a queue of services and I need to run this services one by one and pass result of the first function as parameters for the second one. Services can be synchronous and asynchronous. Result of service can be resolve, reject or some result value. If one of service in queue fail should fail all queue and don`t run next services in queue.
Currently, I have an implementation with Promises
let collection = [
{ name: 'calcOnServer', parameters: {} },
{ name: 'calc', parameters: {} }
];
return collection.reduce((currentFunction, nextFunction, index) => {
return currentFunction.then(() => {
let result = runFunction(nextFunction.name, nextFunction.parameters);
// runFunction is some method that can get class with name from first parameter and execute for it method 'run' with parameters from second parameter
if (result === undefined) {
result = Promise.resolve();
}
if (!result.then) {
if (Boolean(result)) {
result = Promise.resolve(result);
} else {
result = Promise.reject();
}
}
return result.then((result) => {
collection[index + 1].parameters = result;
});
});
}, Promise.resolve())
Currently services can look like
class calcOnServer {
run({param1, param2}) {
return new Promise((resolve, reject) => {
// some async operation
.then(resolve, reject);
}
}
}
class calc {
run({param1, param2}) {
if (typeof param1 === 'number' && typeof param2 === 'number') {
return param1 + param2
} else {
return Promise.reject();
}
}
}
I need to rewrite this logic with RxJS/Observables.
Upvotes: 3
Views: 1579
Reputation: 96891
I'm not completely sure about what your code is supposed to be doing but in general when you want to iterate an array and run the same async operation over all of them you can use mergeScan
operator. It also accepts an optional concurrency
parameter that tells it how many parallel operation you want to run:
import { of, from } from 'rxjs';
import { map, mergeScan, delay } from 'rxjs/operators';
const collection = [1, 2, 3, 4, 5];
const source = from(collection).pipe(
mergeScan((acc, value) => {
console.log(acc, value);
return of(value).pipe( // Your async operation will be here
delay(1000),
map(response => `response-${response}`)
);
}, null, 1), // `1` will make `mergeScan` process items one by one
);
source.subscribe();
This example will print the following output. Notice, that each log contains a response from the previous call and the current value:
null 1
response-1 2
response-2 3
response-3 4
response-4 5
Live demo: https://stackblitz.com/edit/rxjs-3jsae2
Upvotes: 1