tim.breeding
tim.breeding

Reputation: 121

In rxjs, how do I chain mapping through arrays of data received from different API's?

I'm calling an API and receiving an array of results, I'm checking for pagination and if more pages exist I call the next page, repeat until no more pages.

For each array of results, I call another endpoint and do the exact same thing: I receive an array of results, check for another page and call endpoint again. Wash, rinse repeat.

For instance:

I want to grab a list of countries that might be a paginated response, then for each country I want to grab a list of cities, which might also be paginated. And for each city I execute a set of transformations and then store in a database.

I already tried this, but got stuck:


const grabCountries = Observable.create(async (observer) => {
    const url = 'http://api.com/countries'
    let cursor = url
    do {

        const results = fetch(cursor)

        // results = { 
        //   data: [ 'Canada', 'France', 'Spain' ],
        //   next: '47asd8f76358df8f4058898fd8fab'
        // }

        results.data.forEach(country => { observer.next(country) })

        cursor = results.next ? `${url}/${results.next}` : undefined

    } while(cursor)

})


const getCities = {
    next: (country) => {
        const url = 'http://api.com/cities'
        let cursor = url
        do {

            const results = fetch(cursor)

            // results = {
            //     data: [ 
            //         'Montreal', 'Toronto', 
            //         'Paris', 'Marseilles', 
            //         'Barcelona', 'Madrid' 
            //     ],
            //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
            // }

            results.data.forEach(city => { 
                `**** What do I do here?? ****` 
            })

            cursor = results.next ? `${url}/${results.next}` : undefined

        } while(cursor)
    }
}

I tried a few approaches:

Making a subject (sometimes I'll need to do parallel processed base on the results of 'grabCountries'. For example I may want to store the countries in a DB in parallel with grabbing the Cities.)

const intermediateSubject = new Subject()

intermediateSubject.subscribe(storeCountriesInDatabase)
intermediateSubject.subscribe(getCities)

I also tried piping and mapping, but it seems like it's basically the same thing.

As I was writing this I thought of this solution and it seems to be working fine, I would just like to know if I'm making this too complicated. There might be cases where I need to make more that just a few API calls in a row. (Imagine, Countries => States => Cities => Bakeries => Reviews => Comments => Replies) So this weird mapping over another observer callback pattern might get nasty.

So this is what I have now basically:

// grabCountries stays the same as above, but the rest is as follows:

const grabCities = (country) =>
  Observable.create(async (observer) => {
    const url = `http://api.com/${country}/cities`
      let cursor = url
      do {
       const results = fetch(cursor)

       // results = {
       //     data: [
       //         'Montreal', 'Toronto',
       //         'Paris', 'Marseilles',
       //         'Barcelona', 'Madrid'
       //     ],
       //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
       // }

       results.data.forEach(city => {
         observer.next(city)
       })

    cursor = results.next ? `${url}/${results.next}` : undefined

    } while (cursor)
})

const multiCaster = new Subject()

grabCountries.subscribe(multiCaster)
multiCaster.pipe(map((country) => {
    grabCities(country).pipe(map(saveCityToDB)).subscribe()
})).subscribe()
multiCaster.pipe(map(saveCountryToDB)).subscribe()

tl;dr - I call an API that receives a paginated set of results in an array and I need to map through each item and call another api that receives another paginated set of results, each set also in an array.

Is nesting one observable inside another and mapping through the results via 'callApiForCountries.pipe(map(forEachCountryCallApiForCities))' the best method or do you have any other recommendations?

Upvotes: 3

Views: 236

Answers (3)

tim.breeding
tim.breeding

Reputation: 121

OK, so I have spent a lot of brain power on this and have come up with two solutions that seem to be working.

const nestedFlow = () => {
	fetchAccountIDs.pipe(map(accountIds => {
		getAccountPostIDs(accountIds) // Has the do loop for paging inside
			.pipe(
				map(fetchPostDetails),
				map(mapToDBFormat),
				map(storeInDB)
			).subscribe()
	})).subscribe()
}


const expandedflow = () => {
	fetchAccountIDs.subscribe((accountId) => {
		// accountId { accountId: '345367geg55sy'}
		getAccountPostIDs(accountId).pipe(
			expand((results) => {
				/*
				results : {
					postIDs: [
						131424234,
						247345345,
					],
					cursor: '374fg8v0ggfgt94',
				}
				*/
				const { postIDs, cursor } = results
				if (cursor) return getAccountPostIDs({...accountId, cursor})
				return { postIDs, cursor }
			}),
			takeWhile(hasCursor, true), // recurs until cursor is undefined
			concatMap(data => data.postIDs), 
			map(data => ({ post_id: data })), 
			map(fetchPostDetails), 
			map(mapToDBFormat), 
			map(storeInDB) 
		).subscribe()
	})
}

Both seem to be working with similar performance. I read some where that leaving the data flow is a bad practice and you should pipe everything, but I don't know how to eliminate the first exit in the 'expandedFlow' because the 'expand' needs to call back an observable, but maybe it can be done.

Now I just have to solve the race condition issues from the time the 'complete' is called in getAccountPostIDs the the last record is stored in the DB. Currently in my test, the observer.complete is finishing before 3 of the upsert actions.

Any comments are appreciated and I hope this helps someone out in the future.

Upvotes: 1

Fan Cheung
Fan Cheung

Reputation: 11360

Here's the code that should work with sequential crawling of next url. You start with a {next:url} until res.next is not available.

of({next:http://api.com/cities}).pipe(
    expand(res=>results.next ? `${url}/${results.next}` : undefined
    takeWhile(res=>res.next!==undefined)
).subscribe()

Upvotes: 1

Dzhavat Ushev
Dzhavat Ushev

Reputation: 735

What you need is the expand operator. It behaves recursively so it fits the idea of having paginated results.

Upvotes: 0

Related Questions