Eran Medan
Eran Medan

Reputation: 45775

An observable (Subject) of async actions that completes when there are no overlapping async actions in it

I wish the answer was forkJoin / Promises.all, but it's a bit more, please bear with me.

I have a source of promises, that can arrive in random order, and I need some way to say "when all promises that arrived so far are done, let me know".

In a Promise based solution, I thought initially of using Promise.all but promises can be still "arriving" while others haven't completed. Interestingly there is a neat workaround for an "iterable Promise.all" at https://stackoverflow.com/a/37819138/239168

I'm trying to do this the Rx way. After reading the docs for a little, I think forkJoin is the Promise.all equivalent, however, same problem, there is no point in time where I can safely call forkJoin or Promise.all as there can always be one more added while another is still pending... Since I probably make no sense by now, I thought I'll ask for some guidance.

Setup

(hold your laughs if it's silly, I'm new to Rx...)

I have a Subject, and I want to know when all of the promises in it are complete... also it can always get new added promises at any time...

private promiseSource = new Subject<Promise<any>>();
promises$ = this.promiseSource.asObservable();

Every time a new promise "arrives", I'm just adding it to the subject

this.promiseSource.next(somePromise);

What I would like to magically happen is - have the subject "complete" whenever it holds only completed promises.

e.g.

promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({
  next: x => ...,
  error: err => ...,
  complete: () => {
     console.log('all promises we got so far are done');
     // nice to have, I want this to keep "listening" for new promises
     promiseSource.youAreNotREALYCompletePleaseReset(); 
  }
});

Or in other words, I have an observable of async actions, if we take a look at the content, we can see overlapping async actions, I want to know when there is no overlap e.g.

|<-async action 1->|   |<-async action 3->|
          |<-async action 2->|                     |<-async action 4->|

                                          /\      /\
                                        find this gap

if these were for example http calls, I'm asking basically - tell me when there are no open http calls.

tl;dr

how to implement this Promises based answer in an RxJS world...

https://stackoverflow.com/a/37819138/239168

Upvotes: 2

Views: 122

Answers (2)

cartant
cartant

Reputation: 58430

If I'm interpreting your question correctly, you are only interested in a signal that indicates whether or not there are pending promises.

It's pretty easy to use merge and scan to create an observable that emits a count of pending promises and, from that, you should be able to create whatever signals you like.

Basically, every time the subject emits a promise, the count of pending promises should be incremented. And each time one of those promises resolves, the count can be decremented.

const promises = new Rx.Subject();

const pendingCount = Rx.Observable
  .merge(
    promises.mapTo(1),
    promises.mergeMap(p => Rx.Observable.from(p).mapTo(-1))
  )
  .scan((acc, value) => acc + value, 0)
  .do(count => console.log(`${count} pending promise(s)`));

const doneSignal = pendingCount
  .filter(count => count === 0)
  .mapTo("done");

doneSignal.subscribe(signal => console.log(signal));

const timeoutPromise = (delay) => new Promise(resolve => setTimeout(resolve, delay));

promises.next(timeoutPromise(200));
setTimeout(() => promises.next(timeoutPromise(200)), 100);
setTimeout(() => promises.next(timeoutPromise(200)), 300);
setTimeout(() => promises.next(timeoutPromise(200)), 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Upvotes: 2

Matt Burnell
Matt Burnell

Reputation: 2796

I can think of a fairly straightforward way of doing this based on a previous answer. You could use fromPromise to turn your Subject<Promise<any>> into a Subject<Observable<any>>, and then you could use the active function described in this answer to reduce that down to an observable of active observables. Once you've got that, you can phrase your query as "when the array of active streams becomes empty", which can be done with a simple filter, e.g.:

active(yourSubjectOfObservables).filter(x => x.length === 0).subscribe(() => {
    // here we are, all complete
});

This will trigger each time the number of active streams transitions to zero, so if you only want the first time, place a .take(1) or .first between the filter and the subscribe.

Probably not the prettiest solution, but it's conceptually simple.

Upvotes: 1

Related Questions