Ankit Tanna
Ankit Tanna

Reputation: 1819

Recursively calling an API using RxJS expand operator

I have an api which searches the 100 records at a time. It returns a cursor key in the response if it has further results.

For example: 1st request payload

{
  cursor: "",
  query: "abc"
}

Response:

{
  results: [...],
  totalCount: 525,
  cursor: "1234"
}

2nd request payload:

{
  cursor: "1234",
  query: "abc"
}

this will give next 100 records.

So, in all, I will be making 6 calls for 525 records.

Since this API relies on cursor, it has to be a recursive API call. I was thinking of using RxJS' expand operator along with pluck operator to extract id of each result object.

Result object structure: { id: "123" }

My aim is to collate 525 record ids and send it as a response to the subscriber of this API call. I tried below but could not proceed in a concrete manner.

private searchRecordsByQueryWithCursor(
    query: string,
    cursor = ''
  ): Observable<RecordIds> {
    const payload = {
      limit: 100,
      query: query,
      returnedFields: ['id'],
      cursor: cursor
    };
    return this.coreService.fetchSearchResultsByCursor(payload).pipe(
      expand((response) => {
        if (response.cursor) {
          return this.coreService
            .fetchSearchResultsByCursor({
              ...payload,
              cursor: response.cursor
            })
            .pipe(
              catchError(() => {
                return of({
                  results: []
                });
              })
            );
        }
        return EMPTY;
      }),
      pluck('results'),
      reduce((acc: RecordIds, val) => {
        return [...acc, ...val];
      })
    );
  }

Could anyone guide for correct structure of rxjs pipeline?

Output required: [ '123', '456', '789'... ]

all 525 ids.

Upvotes: 1

Views: 1690

Answers (2)

Matthew Heaney
Matthew Heaney

Reputation: 63

As an alternative to terminating via takeWhile() as in the example from BizzyBob, another option is to simply do the test directly inside of expand(), analogous to the base case in a recursive function:

const items$ = fetch().pipe(
  expand((response) => {
    if (!response.cursor) {
      return EMPTY;
    }
    return fetch(response.cursor);
  }),
  reduce((all, { data }) => all.concat(data), [])
);

This works because reduce() consumes the data-part of the response before expand() is called to interrogate the cursor-part of that same response. Returning EMPTY completes the stream (and terminates the recursion).

Here's a working StackBlitz example.

It's not a bad idea to throw a take(1000) in the pipe too, to handle the pathological case when the server does not return an end-of-data indication. Some large-but-finite value will do.

It's also safe to modify the accumulated value in reduce() and return the modified value, since it's a by-reference type; this obviates having to create multiple intermediate array values.

Upvotes: 1

BizzyBob
BizzyBob

Reputation: 14765

Let's use a simplified interface for this example:

interface Response {
  data: number[];
  cursor: number;
}

function fetch(cursor: number): Observable<Response> { ... }

To recursively call fetch() as long as the cursor is defined, the basic flow would look something like this:

const items$ = fetch().pipe(
  expand(response => fetch(response.cursor)),
  takeWhile(response => !!response.cursor, true),
  reduce((all, {data}) => all.concat(data), [])
);
  • expand will accept previous emission and keep calling fetch() passing the cursor from the prior response
  • takeWhile will end the "expand stream" when it receives a response without a cursor
    • we pass true for the "inclusive" paramater as not to discard the data that came back when the cursor was undefined
  • reduce will accumulate all the results into a single array

Output:

> fetch(0) { data: Array[3], cursor: 1 }
> fetch(1) { data: Array[3], cursor: 2 }
> fetch(2) { data: Array[3], cursor: 3 }
> fetch(3) { data: Array[3], cursor: undefined }
items$ [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 ]

Here's a working StackBlitz example.

Upvotes: 5

Related Questions