ThatOneConfusedGuy
ThatOneConfusedGuy

Reputation: 21

How do I gather data from various requests using rxjs in angular in a non-hideous way?

My use-case is as simple as I would think its ubiquitous, yet I cannot find a single tutorial that explains it decently.

I want to fetch a variety of data in the onInit() of my angular module, some sequential, some concurrent. Once all that is done I need to perform syncronous operations on the collected data. I want my code to be halfway readable.

Suppose I have the following (all observables return once, no actual streams):

I simply want to get all that data, pack it in one single observable, subscribe to that so I can run my actual operation in that subscriptions callback.

What I am looking for: A simple pseudocode / code structure that tells me which operators to put in which sequence. Bonus points for the "why".

$Obs1.pipe(mergemap(...))

Extra bonus for readability, if Rxjs is at all compatible with clean code. Yes, I am frustrated by my inability to grasp this, and the utter lack of examples on the matter.

Upvotes: 1

Views: 1181

Answers (4)

James Y
James Y

Reputation: 161

A late response, but this is an issue that has bugged me for a long time and I have recently been involved in implementing a utility operator to explicitly address it. As you say, this is "simple and ubiquitous" and feels like it should have a simple implementation.

With your specific example the code using this operator "concatJoin" looks something like:

ngOnInit() {
  concatJoin(
    this.route.queryParams.pipe(pluck('id')),
    ([id]) => req1$(id),
    ([,idData]) => forkjoin([req2$(idData), req3$(idData), req4$(idData)]),  
  ).subscribe( 
    ([id, idData, [data2, data3, data4]) => ...
  );
}

By default, requests are issued sequentially, with the the results aggregated into an array. Alternatively the operator allows the use of objects (like forkJoin) and to intermix sequential and parallel requests:

ngOnInit() {
  concatJoin(
    {id: this.route.queryParams.pipe(pluck('id'))},
    {idData: ({id}) => req1$(id)},
    {  // requests in the same object are issued in parallel 
      data2: ({idData}) => req2$(idData), 
      data3: ({idData}) => req3$(idData), 
      data4: ({idData}) => req4$(idData),
    }  
  ).subscribe( 
    ({id, idData, data2, data3, data4} => ...
  );
}

The operator is available at rxjs-concat-join and the detail is provided in this article Aggregating Results Across Sequential RxJS Requests.

Upvotes: 0

bryan60
bryan60

Reputation: 29335

requestRest(data1) {
  // parallel request logic encapsulated in a function
  return forkJoin(
    this.req2(data1),
    this.req3(data1),
    this.req4(data1)
  );
}

ngOnInit() {
  // store id observable in a reusable variable (just to illustrate concept)
  const id$ = this.route.params.pipe(
    map(p => p['id'])
  );

  id$.pipe(
    // switch off id into req1
    switchMap(id => this.req1(id)),
    // switch off req1 into rest of data
    switchMap(data1 => this.requestRest(data1).pipe(
      // inner map to combine data1 with rest of data
      map(restOfData => [data1, ...restOfData])))
  ).subscribe(
     ([d1, d2, d3, d4]) => console.log("got all the data")
  )
}

rxjs is a powerful library for building pipelines for handling streams of data. you can build and compose different pipelines however you see fit. There is a bit of a learning curve but once you get over it, you can handle any data stream you can think of. Every front end event(http requests, websocekts, form inputs, timeouts, intervals, clicks, mousemoves, etc) is a data stream and that's makes rxjs a very valuable tool to learn.

this is kind of a trivial example as far as rxjs streams go as sequential and parallel execution and transforms are the basics. When you consider things like error handling, retry logic, caching, and others, which are things most production applications will want, it gets even more powerful.

switchMap alone is one of the most powerful tools offered by rxjs as it handles canceling logic if the id parameter were to change while your requests were in flight, guaranteeing you get the results from the correct id and aren't risking bugs from race conditions between requests.

Upvotes: 2

Andrei Gătej
Andrei Gătej

Reputation: 11969

Here could be one approach:

ngOnInit() {
  this.router.queryParamMap
    .pipe(
      map(getYourId),
      exhaustMap(
        entityId => this.http.get(/* ... */).pipe(
          exhaustMap( // `entityId` also available here
            results => forkJoin([
              of(results),
              this.http.get(''), // Request 1
              this.http.get(''), // Request 2
              this.http.get(''), // Request 3
            ])
          )
        )
      ),
    )
    .subscribe()
}

exhaustMap can have only one active inner observable and if an outer value comes in, it will be skipped. Everything is skipped until the current inner observable completes.

In this case, I wouldn't say it matters which one of exhaustMap, mergeMap, concatMap, switchMap you use.

The first exhaustMap simply makes the request and the next exhaustMap will subscribe to an inner observable created based on the data that comes from the first one(results).

forkJoin will subscribe to its provided observables concurrently, but you're only getting an array of their responses after each observable has emitted at least once and completed.

Upvotes: 0

davidmpaz
davidmpaz

Reputation: 1387

Please check this: https://stackblitz.com/edit/ewi2gy

I know the learning curve is steep but sometimes is better to struggle a bit in order to learn properly, getting the answer working exactly as you want will not help in the long run.

import { forkJoin, of, timer, Observable } from "rxjs";
import Axios from "axios-observable";
import { mapTo, switchMap, map, tap, mergeMap } from "rxjs/operators";

interface Post {
  id: number;
  title: string;
}

interface Comment {
  id: number;
  body: string;
  postId: number;
}

interface Profile {
  name: string;
}

interface FullPost {
  id: number;
  title: string;
  comments: Comment[];
  profile: string;
}

// this observable gives you the id, assuming from query parameter
const id = of(1);

// then start the stream with that id
const postsObservable = id
  .pipe(
    // map the id to the fetched post
    switchMap((id: number) =>
      Axios.get("https://my-json-server.typicode.com/typicode/demo/posts/" + id)
    ),
    // convert response into an object of type Post
    map(response => response.data),
    // fork your requests to get other data, simplistic example only to make the PoC
    mergeMap((post: Post) =>
      forkJoin({
        // get comments
        comments: Axios.get<Comment[]>(
          "https://my-json-server.typicode.com/typicode/demo/comments?postId=" +
            post.id
        ),
        // get something else ...
        profile: Axios.get<Profile>(
          "https://my-json-server.typicode.com/typicode/demo/profile"
        ),
        // give back also the post as Observable, even more simplistic example
        post: of(post)
      })
    ),
    map(valueObject => {
      // work out the aggregation you got here
      return {
        id: valueObject.post.id,
        title: valueObject.post.title,
        comments: valueObject.comments.data,
        profile: valueObject.profile.data.name,
       } as FullPost;
    })
  )
  .subscribe(fullPost =>  console.log(fullPost));

Upvotes: 0

Related Questions