Reputation: 28
I'm working on an Angular7 project that displays information coming from multiple endpoints of an old, slow backend.
I want to process and display the information from each endpoint as it arrives. This way the user doesn't have to wait for the slowest endpoint and the processing load stays distributed.
When the slowest endpoint finally returns, then the combined set should be processed in collectively.
Order of arrival is not guaranteed.
-I have these 2 data sources Rectangles and Squares.
-Calculating the area of each item is a heavy operation.
-Data arrives in complete sets, not item by item.
-I want to sort the complete set when the last set arrives.
Rectangles: [ {x:1 , y:3} , {x:3,y:2} ] 30 seconds to get this
Squares : [ { side:10 } , { side: 2} ] 2 minutes to get this
Desired output:
Displayed at 30 seconds: Rect-Area-3 , Rect-Area-6
Displayed at 2 minutes: Rect-Area-3 , Square-Area-4 , Rect-Area-6 , Square-Area-100
Right now I have code like this for each data source. Data from each source has a completely different json structure, they have little in common.
public getDemographicData(){
const myObserver = {
next: serverData => {
//
//Process result , save.
//Let another component handle data presentation when
//change detection occurs
},
error: err => console.error('Observer got an error: ' + JSON.stringify(err)),
complete: () => {}
};
// getDemographicsData returns an observable using http.get
this.dataService.getDemographicsData(surveyId).subscribe(myObserver);
}
I don't think forkjoin is what I want here, that one makes an observable from multiple observables. I don't want to wait for all the data to come in before processing it.
Any help or comments are appreciated, I could not find a satisfactory or understandable answer elsewhere. I only post on help sites as a last resort.
Upvotes: 0
Views: 1283
Reputation: 10157
You do want forkJoin.
Do the individual streams' data handling inside tap
of each stream and trigger the combined computation in forkJoin and then subscribe to both.
let fastObs = of('Fast').pipe(delay(1000), tap(console.log)); // One of the faster http requests, console.log will trigger after 1 second
let slowObs = of('Slow').pipe(delay(10000), tap(console.log)); // Slower, triggers after 10 seconds
forkJoin(fastObs, slowObs).subscribe(console.log); // Triggers after 10 seconds when you have both streams completed
https://stackblitz.com/edit/typescript-qrephm?file=index.ts
Upvotes: 1