Robby Cornelissen
Robby Cornelissen

Reputation: 97302

Preventing premature completion of an async pipeable operator in RxJS

I'm creating pipeable operators using RxJS 6, and am unclear about how to complete() the observer when the operation is asynchronous.

For a synchronous operation, the logic is simple. In the example below, all values from the source Observable will be passed to observer.next(), and after that observer.complete() is called.

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

For an asynchronous operation, however, I'm a bit at a loss. In the example below, the asynchronous operation is represented by a call to setTimeout(). Obviously, observer.complete() will be called before any of the values are passed to observer.next().

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

So the question is: what is the idiomatic RxJS approach to make it so that the call to observer.complete() is only made after all values are asynchronously passed to observer.next()? Should I be manually keeping track of pending calls or is there a more "reactive" solution?

(Note that the example above is a simplification of my actual code, and that the call to setTimeout() is meant to represent "any asynchronous operation". I'm looking for a general approach to dealing with async operations in pipeable operators, not advice on how to deal with delays or timeouts in RxJS.)

Upvotes: 2

Views: 396

Answers (3)

Picci
Picci

Reputation: 17762

One line of thought could be to restructure your asyncOp to use other operators such as mergeMap.

This is the code that reproduces your example using this approach

const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));

Whether this is something worth considering depends on what your asyncOp does. If it is asynchronous because it relies on some callback, like in case of https calls or reads from file system, than I think this approach can work since you can turn a callback based function into an Observable.

Upvotes: 3

kctang
kctang

Reputation: 11202

I created this runnable StackBlitz demo to show what i think should be done.

The idea here is to use toArray() to get all the values from source observable into an array. The code after toArray() is a single value (array).

Note: There are many ways (operators) to solve a problem, this is just an example based on what I understand from this question - that is both a good & bad thing about RxJS Observables. Hope this helps. :-)

Main demo code is:

// --- for each value, do the async service
of(...[1, 2, 3]).pipe(
  // let each value be processed by both async service...
  concatMap(no => myAsyncService$(no)),
  concatMap(no => myAsyncService2$(no)),

  // --- toArray() combines all the values (i.e. they completed)
  toArray(),

  // --- this will only be called once - with all completed values
  // --- testing: try commenting the toArray() to see the values as individual "next" value
  tap(val => {
    // see the combined values
    console.log(val)
  })
).subscribe();

Upvotes: 0

Robby Cornelissen
Robby Cornelissen

Reputation: 97302

Still hoping to get input on a more reactive/idiomatic implementation, but below is what I decided to go with for the time being.

In essence, I'm just using a counter for in-flight operations (pending) and made it so that the operator completes only when the source observable completes (completed) and there are no pending operations (!pending).

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    let pending = 0; // the number of in-flight operations
    let completed = false; // whether or not the source observable completed
    
    return source.subscribe({
      next: (x) => {
        pending++;
        
        setTimeout(() => {              
          observer.next(x);
          
          if (!--pending && completed) { // no ops pending and source completed
            observer.complete();
          }
        }, 100);
      },
      error: (e) => observer.error(err),
      complete: () => {
        completed = true;
        
        if (!pending) { // no ops pending
          observer.complete();
        }
      }
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

Upvotes: 1

Related Questions