Reputation: 11
I am trying to figure out how to send AJAX requests every five seconds while mapping the data and reducing it using RxJS operators.
So, there's a simple Angular service that requests an external API for astronauts that are in space at the moment: http://api.open-notify.org/astros.json
In the service, I try to first set the interval(5000)
and then use an RxJS mapping operator to map the incoming number to a GET request.
The task is this: I need to first get astronauts, then find those who are flying on ISS, and then those astronauts I can render into a view. The data must be updated each 5 seconds; so I need to re-send the same HTTP request each 5 seconds, which I can do with setInterval()
. And this is not the cleanest solution to the problem.
Here's the code:
import { Injectable } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError, interval } from 'rxjs';
import { catchError, filter, concatMap, flatMap, map, reduce } from 'rxjs/operators';
import { Astronaut } from '../models/astronaut.model';
const astronautsUri = 'http://api.open-notify.org/astros.json';
@Injectable({
providedIn: 'root'
})
export class IssLocatorService {
constructor(private http: HttpClient) {}
getAstronauts(): Observable<Astronaut[]> {
return interval(5000).pipe(
concatMap(num => this.http.get(astronautsUri)),
flatMap((newAstronauts: any) => newAstronauts.people),
filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
map((astronaut: Astronaut) => [astronaut]),
reduce((prev, next) => [...prev, ...next]),
catchError(this.handleError)
);
}
}
The code doesn't work, alas.
Although the flow do gets to the reduce()
operator, the operator doesn't return to the subscribe()
method in the component.
It seems to me, though, that the solution should work fine. Here's how I think it works:
concatMap()
waits until the inner Observable completes and only then does it get the second number - 1.{success:true, people: [...]}
), so then I use flatMap()
to transform this object to the array of astronauts people
. Each object in people
then becomes an Observable due to how flatMap()
works.astronauts.people
gets reproduced thanks to reduce()
.reduce()
, according to the specification, should return to subscribe
because the inner Observable has completed. But it doesn't: reduce()
waits until the next number is produced by interval()
and is mapped to an inner Observable, again people
are returned and pushed to the same array. And it goes on and on.If I replace reduce()
with scan
, the array or astronauts does get returned to the subscribe
method. However, this array continually gets larger due to astronaut objects being repeatedly pushed into it.
The following approach works just fine:
return this.http.get(astronautsUri).pipe(
flatMap((newAstronauts: any) => newAstronauts.people),
filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
map((astronaut: Astronaut) => [astronaut]),
reduce((prev, next) => [...prev, ...next]),
catchError(this.handleError)
);
But in this case, I have to manually set the interval with setInterval()
in the component class that renders astronauts, and I have to call the getAstronauts()
method. So, there are two calls of the method in ngOnInit
, basically.
How can I achieve the desired effect with just RxJS operators? I want to set up an interval, map and filter and reduce the array of objects, and then receive them.
My understanding of how RxJS mapping works is really bad, but I tried (for the sake of trying) all these methods - switchMap()
, concatMap()
, exhaustMap()
, flatMap()
- to map numbers from interval()
to an AJAX request. It still doesn't work.
Upvotes: 0
Views: 1247
Reputation: 21658
Change reduce to scan, scan does the same thing as reduce but emits on each iteration, reduce only emits once the whole stream completes.
const { from } = rxjs;
const { scan } = rxjs.operators;
const obs$ = from([1,2,3,4,5]);
obs$.pipe(scan((total, item) => total + item)).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
const { from } = rxjs;
const { reduce } = rxjs.operators;
const obs$ = from([1,2,3,4,5]);
obs$.pipe(reduce((total, item) => total + item)).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
Upvotes: 0
Reputation: 12206
I think what you are trying to achieve is
getAstronauts(): Observable<Astronaut[]> {
return interval(5000).pipe(
concatMap(num =>
this.http.get(astronautsUri).pipe(
flatMap((newAstronauts: any) => newAstronauts.people),
filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
reduce((prev, astronaut) => [...prev, astronaut], []),
)
),
catchError(this.handleError)
);
}
the problem of reduce method is it is waiting for its source observable to complete, before emiting any value. in my code source is elements of one request and in question example source is all elements from all requests, which never ends
Upvotes: 0