Maksym Petrenko
Maksym Petrenko

Reputation: 1063

rxjs/Observable: Execute functions one by one and pass parameters of functions as result of previous function

I have an array with a queue of services and I need to run this services one by one and pass result of the first function as parameters for the second one. Services can be synchronous and asynchronous. Result of service can be resolve, reject or some result value. If one of service in queue fail should fail all queue and don`t run next services in queue.

Currently, I have an implementation with Promises

let collection = [
 { name: 'calcOnServer', parameters: {} },
 { name: 'calc', parameters: {} }
];

return collection.reduce((currentFunction, nextFunction, index) => {
  return currentFunction.then(() => {
    let result = runFunction(nextFunction.name, nextFunction.parameters);
    // runFunction is some method that can get class with name from first parameter and execute for it method 'run' with parameters from second parameter


    if (result === undefined) {
      result = Promise.resolve();
    }

    if (!result.then) {
      if (Boolean(result)) {
        result = Promise.resolve(result);
      } else {
        result = Promise.reject();
      }
    }

    return result.then((result) => {
      collection[index + 1].parameters = result;
    });
  });
}, Promise.resolve())

Currently services can look like

class calcOnServer {
  run({param1, param2}) {
    return new Promise((resolve, reject) => {
        // some async operation
       .then(resolve, reject);
    }
  }
}

class calc {
  run({param1, param2}) {
    if (typeof param1 === 'number' && typeof param2 === 'number') {
      return param1 + param2
    } else {
      return Promise.reject();
    }
  }
}

I need to rewrite this logic with RxJS/Observables.

Upvotes: 3

Views: 1579

Answers (1)

martin
martin

Reputation: 96891

I'm not completely sure about what your code is supposed to be doing but in general when you want to iterate an array and run the same async operation over all of them you can use mergeScan operator. It also accepts an optional concurrency parameter that tells it how many parallel operation you want to run:

import { of, from } from 'rxjs'; 
import { map, mergeScan, delay } from 'rxjs/operators';

const collection = [1, 2, 3, 4, 5];

const source = from(collection).pipe(
  mergeScan((acc, value) => {
    console.log(acc, value);
    return of(value).pipe( // Your async operation will be here
      delay(1000),
      map(response => `response-${response}`)
    );
  }, null, 1), // `1` will make `mergeScan` process items one by one
);

source.subscribe();

This example will print the following output. Notice, that each log contains a response from the previous call and the current value:

null 1
response-1 2
response-2 3
response-3 4
response-4 5

Live demo: https://stackblitz.com/edit/rxjs-3jsae2

Upvotes: 1

Related Questions