nicojs
nicojs

Reputation: 2055

How to chain a list of promises in RXJS?

How can i chain a list of promises in RXJS? Every promise needs to be executed when the previous is resolved (work todo is stateful).

The way i'm doing it now feels primitive:

const workTodo = []; // an array of work
const allWork = Observable.create(observer => {
  const next= () => {
    const currentTodo = workTodo.shift();
    if (currentTodo ) {
      doTodoAsync(currentTodo)
        .then(result => observer.onNext(result))
        .then(next);
    } else {
      observer.onCompleted();
    }
  };
  next();
});

I was thinking something like this:

const workTodo = []; // an array of work
const allWork = Observable
                  .fromArray(workTodo)
                  .flatMap(doTodoAsync);

But that basically executes all promises at once.

Upvotes: 1

Views: 897

Answers (2)

artur grzesiak
artur grzesiak

Reputation: 20348

It seems you were pretty close with your attempt.

You may either specify maximum concurrency of 1 for .flatMap like:

Observable.fromArray(workTodo)
 .flatMap(doTodoAsync, 1)

or equivalently use .concatMap instead of .flatMap:

Observable.fromArray(workTodo)
 .concatMap(doTodoAsync)

I would use concatMap as it feels more idiomatic.

UPDATE: DEMO

Upvotes: 1

CozyAzure
CozyAzure

Reputation: 8468

How about some recursion?

First create a recursive function and call it recursiveDoToDo:

const recursiveDoToDo = (currentTodo, index) =>
    Observable
        .fromPromise(doTodoAsync(currentTodo))
        .map(resolved => ({resolved, index}));

The code above simply wraps your doTodoAsync into an Observable, and then we map the results to return the resolved promise and the index of the array, for recursion use later.

Next, we will recursively call the recursiveDoToDo with the .expand() operator.

recursiveDoToDo(worktodo[0], 0)
    .expand(res => recursiveDoToDo(worktodo[res.index + 1], res.index + 1))
    .take(worktodo.length)

All you need to do for your recursion is just to increment the index by 1. Because .expand() will recursively run forever, the .take() operator is there to tell the observable when to end the stream, which is the length of your worktodo.

Now you can simply subscribe to it:

recursion.subscribe(x => console.log(x));

Here is the working JS Bin

Upvotes: 0

Related Questions