Reputation: 45775
I wish the answer was forkJoin
/ Promises.all
, but it's a bit more, please bear with me.
I have a source of promises, that can arrive in random order, and I need some way to say "when all promises that arrived so far are done, let me know".
In a Promise based solution, I thought initially of using Promise.all
but promises can be still "arriving" while others haven't completed. Interestingly there is a neat workaround for an "iterable Promise.all" at https://stackoverflow.com/a/37819138/239168
I'm trying to do this the Rx way. After reading the docs for a little, I think forkJoin
is the Promise.all
equivalent, however, same problem, there is no point in time where I can safely call forkJoin
or Promise.all
as there can always be one more added while another is still pending... Since I probably make no sense by now, I thought I'll ask for some guidance.
Setup
(hold your laughs if it's silly, I'm new to Rx...)
I have a Subject, and I want to know when all of the promises in it are complete... also it can always get new added promises at any time...
private promiseSource = new Subject<Promise<any>>();
promises$ = this.promiseSource.asObservable();
Every time a new promise "arrives", I'm just adding it to the subject
this.promiseSource.next(somePromise);
What I would like to magically happen is - have the subject "complete" whenever it holds only completed promises.
e.g.
promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({
next: x => ...,
error: err => ...,
complete: () => {
console.log('all promises we got so far are done');
// nice to have, I want this to keep "listening" for new promises
promiseSource.youAreNotREALYCompletePleaseReset();
}
});
Or in other words, I have an observable of async actions, if we take a look at the content, we can see overlapping async actions, I want to know when there is no overlap e.g.
|<-async action 1->| |<-async action 3->|
|<-async action 2->| |<-async action 4->|
/\ /\
find this gap
if these were for example http calls, I'm asking basically - tell me when there are no open http calls.
tl;dr
how to implement this Promises based answer in an RxJS world...
https://stackoverflow.com/a/37819138/239168
Upvotes: 2
Views: 122
Reputation: 58430
If I'm interpreting your question correctly, you are only interested in a signal that indicates whether or not there are pending promises.
It's pretty easy to use merge
and scan
to create an observable that emits a count of pending promises and, from that, you should be able to create whatever signals you like.
Basically, every time the subject emits a promise, the count of pending promises should be incremented. And each time one of those promises resolves, the count can be decremented.
const promises = new Rx.Subject();
const pendingCount = Rx.Observable
.merge(
promises.mapTo(1),
promises.mergeMap(p => Rx.Observable.from(p).mapTo(-1))
)
.scan((acc, value) => acc + value, 0)
.do(count => console.log(`${count} pending promise(s)`));
const doneSignal = pendingCount
.filter(count => count === 0)
.mapTo("done");
doneSignal.subscribe(signal => console.log(signal));
const timeoutPromise = (delay) => new Promise(resolve => setTimeout(resolve, delay));
promises.next(timeoutPromise(200));
setTimeout(() => promises.next(timeoutPromise(200)), 100);
setTimeout(() => promises.next(timeoutPromise(200)), 300);
setTimeout(() => promises.next(timeoutPromise(200)), 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Upvotes: 2
Reputation: 2796
I can think of a fairly straightforward way of doing this based on a previous answer. You could use fromPromise
to turn your Subject<Promise<any>>
into a Subject<Observable<any>>
, and then you could use the active
function described in this answer to reduce that down to an observable of active observables. Once you've got that, you can phrase your query as "when the array of active streams becomes empty", which can be done with a simple filter, e.g.:
active(yourSubjectOfObservables).filter(x => x.length === 0).subscribe(() => {
// here we are, all complete
});
This will trigger each time the number of active streams transitions to zero, so if you only want the first time, place a .take(1)
or .first
between the filter and the subscribe.
Probably not the prettiest solution, but it's conceptually simple.
Upvotes: 1