Reputation: 67
I have a very large list of ordered observables that needs to run in parallel. When each observables returns they append their result to a behaviour subject, which is how the results are passed on. However, I need a specific function to be called when they have all completed.
The observables each download an image (and related metadata), from an API. The requests need to be executed as fast as possible and I need to work with each result as it emits and emit an empty value when all the observables are finished. This means that the observables should be executed in parallel.
The original implementation, without a callback when they complete.
const requests: Observable[] = getRequests();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
In order to implement the callback when all requests have completed, I have tried the following.
const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
forkJoin(requests).subscribe(() => {
finishedTracker.next();
finishedTracker.complete();
console.log('requests done');
});
This works, but it seems strange to me, that I need to split up the forkJoin and subscriptions to the requests. Is there a better way to implement this functionality? I looked at mergeMap as well, but was not able to make it work.
Edit Based on comments, I have realised subscribing twice means making the requests twice. Therefore I have attempted another implementation.
from(requests).pipe(
mergeMap(o => {
o.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}
return o;
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})
I did not use the result from forkJoin
because, as for as I understand, gives me the result of all the requests. Therefore it needs to wait for them to finish. Since each requests is relatively fast, but there are often hundreds of them I need their individual results passed to the behaviour subject as soon as each request finishes.
Edit 2 The solution I went with.
from(requests).pipe(
mergeMap(request => request, 10),
scan<ImageResponse, ImageResponse[]>((all, current, index) => {
all = all.concat(current);
this.$images.next(all);
return all;
}, [])
).subscribe({
complete: () => {
finishedTracker.next();
console.log('requests done');
}});
Upvotes: 3
Views: 2166
Reputation: 14740
It's not necessary to subscribe inside of your mergeMap
. In fact, as others have pointed out, it's causing a double subscription since mergeMap
internally subscribes to the observable returned by the function you pass to it.
To handle the responses as they occur you can simply use a pipe and add your handling logic inside. Since you are essentially doing a side effect (something that doesn't modify the output of the current stream), using the tap
operator is appropriate:
from(requests).pipe(
mergeMap(o => o.pipe(
tap(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}),
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})
While this will work, it looks like you are over complicating the observable flow. I'm not exactly sure of your use case, but I'd guess the Subjects aren't really needed at all. If your goal is to emit a cumulative array of results as they are processed, you can use scan
for this without involving any Subject
or BehaviorSubject
. To do some logic when all requests have completed, you can pass a partial Observer
, that specifies only the complete
callback (instead of the next
callback which is implicitly used when you pass a function as an argument to subscribe()
):
from(requests).pipe(
mergeMap(request => request, 10),
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.log('requests done')
});
EDIT:
As pointed out by @AdrianBrand, it's more concise to use merge
instead of from
/mergeMap
:
merge(...requests, 10).pipe(
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.log('requests done')
})
Upvotes: 2
Reputation: 21638
Here is an expand that takes an accumulator that has an array of requests and the responses so far.
const { of, expand, takeWhile, map, delay } = rxjs;
const requests = Array.from({ length: 10 }, (_, i) => of(`Response ${i + 1}`).pipe(delay(Math.random() * 500)));
const responses$ = of({ requests, responses: [] })
.pipe(
expand((acc) =>
acc.requests[0].pipe(
map((response) => {
const [, ...remainingRequests] = acc.requests;
return { requests: remainingRequests, responses: [...acc.responses, response] };
})
)
),
takeWhile((acc) => acc.requests.length, true),
map((acc) => acc.responses)
);
responses$.subscribe((responses) => {
console.log(responses);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
Upvotes: 1
Reputation: 798
Ok, if I understand you correctly, you just simply need to track multiple async http requests. You would like to "track" each separately and work with as soon as complete for each is fired, as also you need to track if all request are done and that's why you need to forJoin all requests. In that case I think your forEach + forkJoin approach is good, although as you said in edit, subscribing twice to http request will call to server twice, which is undesired behavior. I would just simply create another Observable array
that sole purpose is to track completion of all events.
const requests: Observable[] = getRequests();
const finishTracker: Observable[] = [];
requests.forEach(obs => {
const innerTracker = new Subject();
finishTracker.push(innerTracker.asObservable());
obs.subscribe(res => {
innerTracker.next();
innerTracker.complete();
// Do whatever you want with your image responce
doSomethingWithRes(res);
});
});
forkJoin(finishTracker).subscribe(res => console.log('all finished'));
WARNING! Because subscribe is called in forEach
loop, due to async nature of http req, I think that this may lead to undesired behavior of forkJoin
firing complete or not firing at all, due to innerTracker.complete()
firing faster than forEach ends populating array. In that case you will need to create separate array of objects before calling subscribe.
const requests: Observable[] = getRequests();
const additionalArray: any[] = [];
const finishTracker: Observable[] = [];
requests.forEach(obs => {
const innerTracker = new Subject();
finishTracker.push(innerTracker.asObservable());
additionalArray.push({myObs: obs, tracker: innerTracker});
});
forkJoin(finishTracker).subscribe(res => console.log('all finished'));
additionalArray.forEach(myObj => {
myObj.myObs.subscribe(
res => {
myObj.tracker.next();
myObj.tracker.complete();
// Do whatever you want with your image responce
doSomethingWithRes(res);
},
err => {
doSomethingWithErr(err)
}
)
});
You can move this filling process into your getRequests()
function. This might be not the most elegant solution, but it works.
Upvotes: 1
Reputation: 54773
What about a simple combine latest ?
import { of, Observable, combineLatest, startWith } from 'rxjs';
declare const getRequests: () => Observable<any>[];
const requests: Observable<any>[] = getRequests();
combineLatest(requests.map(r => r.pipe(startWith(null)))).subscribe(result => {
if (!result.some(r => r === null)) {
// everything loaded
} else {
// not finished
}
})
Upvotes: 1