Reputation: 21
I need some help from RxJS professionals :)
I try to recursively load data from a REST API via http request. Recursive calls are working fine, however when I susbscribe to the final Observable (returned by GetTemperatures), no data is returned within subscribe.
Seems like no data is passed back in the call chain.
Whats going wrong here?
GetTemperatures().subscribe((data: MeasureData) => {
// add data to a chart, etc...
})
GetTemperatures(): Observable<MeasureData> {
const l_startDate = new Date(2019, 0, 1);
var l_httpParams = new HttpParams()
.set('device_id', this._deviceId)
.set('module_id', this._moduleId)
.set('scale', '1hour')
.set('type', 'Temperature')
.set('date_begin', Math.floor(l_startDate.getTime() / 1000).toString())
.set('real_time', 'true')
.set('optimize', 'true');
return this._http.post<MeasureDataInternal>(this._getMeasureUrl, l_httpParams)
.pipe(
map((data: MeasureDataInternal): MeasureData => this.transformMeasureData(data)),
flatMap((data: MeasureData) => {
return this.recursiveLoadData(data);
})
);
}
recursiveLoadData(data: MeasureData): Observable<MeasureData> {
// search until now minus 1,5 hours
const endDate = new Date(Date.now() - (1.5 * 60 * 60 * 1000));
console.error('RECURSIVE begin: ' + data.value[0].date + ' end: ' + data.value[data.value.length - 1].date);
// check if complete
if (data.value[data.value.length - 1].date.getTime() >= endDate.getTime()) {
console.error('recursive ENDs here');
return EMPTY;
}
var l_httpParams = new HttpParams()
.set('device_id', this._deviceId)
.set('module_id', this._moduleId)
.set('scale', '1hour')
.set('type', 'Temperature')
.set('date_begin', Math.floor(data.value[data.value.length - 1].date.getTime() / 1000).toString())
.set('real_time', 'true')
.set('optimize', 'true');
return this._http.post<MeasureDataInternal>(this._getMeasureUrl, l_httpParams)
.pipe(
map((data2: MeasureDataInternal): MeasureData => this.transformMeasureData(data2)),
flatMap((data2: MeasureData) => {
return this.recursiveLoadData(data2);
})
)
}
Upvotes: 0
Views: 127
Reputation: 8062
I have no idea what you're really trying to accomplish, but each new step in your recursion doesn't do anything other than bringing you to the next step. So you'll want to include what you're hoping each step does.
This isn't specific to streams, this is also true of general recursion.
This really isn't any different from how a regular recursive function works. Say you're recursively adding up the numbers in an array, you need to add the tail of the array to the first value. If you just keep recursing on a smaller array without adding up the numbers you've popped off, you'd get the base-case value back.
This returns the last value of the array (The last value of the array is the base-case):
recursiveAdd(array){
if(array.length === 1) return array[0];
return recursiveAdd(array.shift());
}
This adds the array:
recursiveAdd(array){
if(array.length === 1) return array[0];
return array[0] + recursiveAdd(array.shift());
}
In this simple case, the +
operand is doing the work at each step of the recursion. Without it, the array isn't summed up. And, of course, I could do anything. Subtract the array from 1000, average the numbers in the array, build an object from the values. Anything.
Before you make a recursive call, you have to do something. Unless what you're after is the value of the base-case (In your case, an empty stream)
When you mergeMap a value into a stream, you don't also pass forward that value.
from([69,70,71]).pipe(
mergeMap(val => from([
String.fromCharCode(val),
String.fromCharCode(val),
String.fromCharCode(val)
]))
).subscribe(console.log);
output
e e e f f f g g g
Notice how the output doesn't include any numbers? When you mergeMap
, you map values into streams. If you want the values you're mapping to be part of the stream, you must include them somehow. This is the same as with general recursion.
So, here are two examples that both include your data in the returned stream. They're very basic, but hopefully, you can take some understanding from them and apply that.
This transforms the returned steam to include your data as its first value (recursively, of course)
return this._http.post<MeasureDataInternal>(this._getMeasureUrl, l_httpParams)
.pipe(
map((data: MeasureDataInternal): MeasureData =>
this.transformMeasureData(data)
),
mergeMap((data: MeasureData) =>
this.recursiveLoadData(data).pipe(
startWith(data)
)
)
);
This creates a stream of your data, a stream of your recursive call, and merges the two streams together.
return this._http.post<MeasureDataInternal>(this._getMeasureUrl, l_httpParams)
.pipe(
map((data: MeasureDataInternal): MeasureData =>
this.transformMeasureData(data)
),
mergeMap((data: MeasureData) =>
merge (
of(data),
this.recursiveLoadData(data)
)
)
);
Upvotes: 1