Quentin
Quentin

Reputation: 1933

RxJs forkJoin several observables in a row

UPDATE :

I have a problem that I cannot resolve.

In my code, I have an objects list. For each of the objects, I have to chain 3 requests in a row and not at the same time. But each objects could be done in parallel.

So I am using a forkjoin to execute code when the queries are complete. But the loop executes all the procedures ignoring the errors.


So I changed my code below and the 3 procedures run well in succession.

However, in the event of an error, the catchError is not taken into account. The code therefore connects procedure 1 then 2 and 3, even if 1 is in error.

The procedure3 this.store.proc3 (input3, "ERR") must be executed even after an error.

this.list.forEach(obj => {
    var input1 = obj...;
    var input2 = obj...;
    var input3 = obj...;
    
    var obs = this.store.input1(input1).pipe(
        catchError(err1 => this.store.proc3(input3, "ERR")),
        concatMap(res1 => this.store.proc2(input2).pipe(
            catchError(err2 => this.store.proc3(input3, "ERR"),
            concatMap(res2 => this.store.proc3(input3, "OK")
        ))
    );
    _obs$.push(obs);
}

forkJoin(_obs$).subscribe(
    results => {
        if(results) {
            this._dialogRef.close(true);
            // ...
        }
    }
);

CURRENT CASE :

  • proc1 OK -> proc2 OK -> proc3(OK)
  • proc1 OK -> proc2 ERR -> proc3(OK)
  • proc1 ERR -> proc2 ERR -> proc3(OK)

DESIRED CASE :

  • proc1 OK -> proc2 OK -> proc3(OK)
  • proc1 OK -> proc2 ERR -> proc3(ERR)
  • proc1 ERR -> proc3(ERR)

INFOS :

  • proc1 -> return true OR exeption
  • proc2 -> return true OR exeption
  • proc3 -> return object (allows you to change the status of the object)

Someone have a solution, I'm not familiar with RxJs.

Upvotes: 0

Views: 730

Answers (1)

Barremian
Barremian

Reputation: 31145

The question isn't clear at the moment. If you wish to make 3 sequential calls for each element in the array you could pipe each of them using concatMaps. And if you say these 3 calls for all the elements could be done in parallel, then it could all be enclosed in a forkJoin.

I am not yet sure what value you expect in the subscription though. The following code will give an array of responses from procedure3 for each element.

forkJoin(
  this.list.map(obj =>                           // JS `Array#map`
    this.store.procedure1(obj).pipe(             // 1st call
      concatMap(_ =>
        this.store.procedure2(obj).pipe(         // 2nd call
          concatMap(result =>
            iif(
              () => !!result,                    // conditional 3rd call using RxJS `iif`
              this.store.procedure3(this.var),
              this.store.procedure3(this.var)    // <-- same calls for both conditions?
            )
          )
        )
      )
    )
  )
).subscribe(
  ...
);

Update: include catchError

You're almost there. In RxJS, the order of operators piped in is very important. In your case, since the catchError is above the concatMaps and since it convert the error to a next emission, it triggers the following concatMaps. The solution is to move the catchError blocks below the concatMaps.

Try the following

this.list.forEach(obj => {
    var input1 = obj...;
    var input2 = obj...;
    var input3 = obj...;
    
    var obs = this.store.proc1(input1).pipe(
        concatMap(res1 => this.store.proc2(input2).pipe(
            concatMap(res2 => this.store.proc3(input3, "OK")),
            catchError(err2 => this.store.proc3(input3, "ERR"))
        )),
        catchError(err1 => this.store.proc3(input3, "ERR")),
    );
    _obs$.push(obs);
}

Upvotes: 1

Related Questions