Reputation: 21
My use-case is as simple as I would think its ubiquitous, yet I cannot find a single tutorial that explains it decently.
I want to fetch a variety of data in the onInit() of my angular module, some sequential, some concurrent. Once all that is done I need to perform syncronous operations on the collected data. I want my code to be halfway readable.
Suppose I have the following (all observables return once, no actual streams):
I simply want to get all that data, pack it in one single observable, subscribe to that so I can run my actual operation in that subscriptions callback.
What I am looking for: A simple pseudocode / code structure that tells me which operators to put in which sequence. Bonus points for the "why".
$Obs1.pipe(mergemap(...))
Extra bonus for readability, if Rxjs is at all compatible with clean code. Yes, I am frustrated by my inability to grasp this, and the utter lack of examples on the matter.
Upvotes: 1
Views: 1181
Reputation: 161
A late response, but this is an issue that has bugged me for a long time and I have recently been involved in implementing a utility operator to explicitly address it. As you say, this is "simple and ubiquitous" and feels like it should have a simple implementation.
With your specific example the code using this operator "concatJoin" looks something like:
ngOnInit() {
concatJoin(
this.route.queryParams.pipe(pluck('id')),
([id]) => req1$(id),
([,idData]) => forkjoin([req2$(idData), req3$(idData), req4$(idData)]),
).subscribe(
([id, idData, [data2, data3, data4]) => ...
);
}
By default, requests are issued sequentially, with the the results aggregated into an array. Alternatively the operator allows the use of objects (like forkJoin) and to intermix sequential and parallel requests:
ngOnInit() {
concatJoin(
{id: this.route.queryParams.pipe(pluck('id'))},
{idData: ({id}) => req1$(id)},
{ // requests in the same object are issued in parallel
data2: ({idData}) => req2$(idData),
data3: ({idData}) => req3$(idData),
data4: ({idData}) => req4$(idData),
}
).subscribe(
({id, idData, data2, data3, data4} => ...
);
}
The operator is available at rxjs-concat-join and the detail is provided in this article Aggregating Results Across Sequential RxJS Requests.
Upvotes: 0
Reputation: 29335
requestRest(data1) {
// parallel request logic encapsulated in a function
return forkJoin(
this.req2(data1),
this.req3(data1),
this.req4(data1)
);
}
ngOnInit() {
// store id observable in a reusable variable (just to illustrate concept)
const id$ = this.route.params.pipe(
map(p => p['id'])
);
id$.pipe(
// switch off id into req1
switchMap(id => this.req1(id)),
// switch off req1 into rest of data
switchMap(data1 => this.requestRest(data1).pipe(
// inner map to combine data1 with rest of data
map(restOfData => [data1, ...restOfData])))
).subscribe(
([d1, d2, d3, d4]) => console.log("got all the data")
)
}
rxjs is a powerful library for building pipelines for handling streams of data. you can build and compose different pipelines however you see fit. There is a bit of a learning curve but once you get over it, you can handle any data stream you can think of. Every front end event(http requests, websocekts, form inputs, timeouts, intervals, clicks, mousemoves, etc) is a data stream and that's makes rxjs a very valuable tool to learn.
this is kind of a trivial example as far as rxjs streams go as sequential and parallel execution and transforms are the basics. When you consider things like error handling, retry logic, caching, and others, which are things most production applications will want, it gets even more powerful.
switchMap
alone is one of the most powerful tools offered by rxjs as it handles canceling logic if the id parameter were to change while your requests were in flight, guaranteeing you get the results from the correct id and aren't risking bugs from race conditions between requests.
Upvotes: 2
Reputation: 11969
Here could be one approach:
ngOnInit() {
this.router.queryParamMap
.pipe(
map(getYourId),
exhaustMap(
entityId => this.http.get(/* ... */).pipe(
exhaustMap( // `entityId` also available here
results => forkJoin([
of(results),
this.http.get(''), // Request 1
this.http.get(''), // Request 2
this.http.get(''), // Request 3
])
)
)
),
)
.subscribe()
}
exhaustMap
can have only one active inner observable and if an outer value comes in, it will be skipped. Everything is skipped until the current inner observable completes.
In this case, I wouldn't say it matters which one of exhaustMap
, mergeMap
, concatMap
, switchMap
you use.
The first exhaustMap
simply makes the request and the next exhaustMap
will subscribe to an inner observable created based on the data that comes from the first one(results
).
forkJoin
will subscribe to its provided observables concurrently, but you're only getting an array of their responses after each observable has emitted at least once and completed.
Upvotes: 0
Reputation: 1387
Please check this: https://stackblitz.com/edit/ewi2gy
I know the learning curve is steep but sometimes is better to struggle a bit in order to learn properly, getting the answer working exactly as you want will not help in the long run.
import { forkJoin, of, timer, Observable } from "rxjs";
import Axios from "axios-observable";
import { mapTo, switchMap, map, tap, mergeMap } from "rxjs/operators";
interface Post {
id: number;
title: string;
}
interface Comment {
id: number;
body: string;
postId: number;
}
interface Profile {
name: string;
}
interface FullPost {
id: number;
title: string;
comments: Comment[];
profile: string;
}
// this observable gives you the id, assuming from query parameter
const id = of(1);
// then start the stream with that id
const postsObservable = id
.pipe(
// map the id to the fetched post
switchMap((id: number) =>
Axios.get("https://my-json-server.typicode.com/typicode/demo/posts/" + id)
),
// convert response into an object of type Post
map(response => response.data),
// fork your requests to get other data, simplistic example only to make the PoC
mergeMap((post: Post) =>
forkJoin({
// get comments
comments: Axios.get<Comment[]>(
"https://my-json-server.typicode.com/typicode/demo/comments?postId=" +
post.id
),
// get something else ...
profile: Axios.get<Profile>(
"https://my-json-server.typicode.com/typicode/demo/profile"
),
// give back also the post as Observable, even more simplistic example
post: of(post)
})
),
map(valueObject => {
// work out the aggregation you got here
return {
id: valueObject.post.id,
title: valueObject.post.title,
comments: valueObject.comments.data,
profile: valueObject.profile.data.name,
} as FullPost;
})
)
.subscribe(fullPost => console.log(fullPost));
Upvotes: 0