Reputation: 1819
I have an api which searches the 100 records at a time. It returns a cursor key in the response if it has further results.
For example: 1st request payload
{
cursor: "",
query: "abc"
}
Response:
{
results: [...],
totalCount: 525,
cursor: "1234"
}
2nd request payload:
{
cursor: "1234",
query: "abc"
}
this will give next 100 records.
So, in all, I will be making 6 calls for 525 records.
Since this API relies on cursor, it has to be a recursive API call. I was thinking of using RxJS' expand operator along with pluck operator to extract id of each result object.
Result object structure: { id: "123" }
My aim is to collate 525 record ids and send it as a response to the subscriber of this API call. I tried below but could not proceed in a concrete manner.
private searchRecordsByQueryWithCursor(
query: string,
cursor = ''
): Observable<RecordIds> {
const payload = {
limit: 100,
query: query,
returnedFields: ['id'],
cursor: cursor
};
return this.coreService.fetchSearchResultsByCursor(payload).pipe(
expand((response) => {
if (response.cursor) {
return this.coreService
.fetchSearchResultsByCursor({
...payload,
cursor: response.cursor
})
.pipe(
catchError(() => {
return of({
results: []
});
})
);
}
return EMPTY;
}),
pluck('results'),
reduce((acc: RecordIds, val) => {
return [...acc, ...val];
})
);
}
Could anyone guide for correct structure of rxjs pipeline?
Output required: [ '123', '456', '789'... ]
all 525 ids.
Upvotes: 1
Views: 1690
Reputation: 63
As an alternative to terminating via takeWhile()
as in the example from BizzyBob, another option is to simply do the test directly inside of expand()
, analogous to the base case in a recursive function:
const items$ = fetch().pipe(
expand((response) => {
if (!response.cursor) {
return EMPTY;
}
return fetch(response.cursor);
}),
reduce((all, { data }) => all.concat(data), [])
);
This works because reduce()
consumes the data-part of the response before expand()
is called to interrogate the cursor-part of that same response. Returning EMPTY
completes the stream (and terminates the recursion).
Here's a working StackBlitz example.
It's not a bad idea to throw a take(1000)
in the pipe too, to handle the pathological case when the server does not return an end-of-data indication. Some large-but-finite value will do.
It's also safe to modify the accumulated value in reduce()
and return the modified value, since it's a by-reference type; this obviates having to create multiple intermediate array values.
Upvotes: 1
Reputation: 14765
Let's use a simplified interface for this example:
interface Response {
data: number[];
cursor: number;
}
function fetch(cursor: number): Observable<Response> { ... }
To recursively call fetch()
as long as the cursor
is defined, the basic flow would look something like this:
const items$ = fetch().pipe(
expand(response => fetch(response.cursor)),
takeWhile(response => !!response.cursor, true),
reduce((all, {data}) => all.concat(data), [])
);
expand
will accept previous emission and keep calling fetch()
passing the cursor from the prior responsetakeWhile
will end the "expand stream" when it receives a response without a cursor
true
for the "inclusive" paramater as not to discard the data that came back when the cursor was undefinedreduce
will accumulate all the results into a single array> fetch(0) { data: Array[3], cursor: 1 }
> fetch(1) { data: Array[3], cursor: 2 }
> fetch(2) { data: Array[3], cursor: 3 }
> fetch(3) { data: Array[3], cursor: undefined }
items$ [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 ]
Here's a working StackBlitz example.
Upvotes: 5