myts999
myts999

Reputation: 75

Multiple REST API calls on 1m data entries using Databricks + scala?

I am trying to get an API call to get all the buildings in LA county. The website for the dataset is here

The county has 3 million buildings I've filtered buildings to 1 million-ish. You can look at my QUERY_PARAMS in the code.

I've tried using python but without surprise, retrieving 1 million data points still takes up a long time.

From the ESRI developer website, I understand that 1 single API call is limited to 10,000 results. However, because of my problem, I need to retrieve all 1 million buildings.

Here is my code so far, even after using async functions it still takes about 10 minutes

import aiohttp
import asyncio
import nest_asyncio

nest_asyncio.apply()  # Required if running in Jupyter Notebook

# Base URL for the API query
BASE_URL = "https://services.arcgis.com/RmCCgQtiZLDCtblq/arcgis/rest/services/Countywide_Building_Outlines/FeatureServer/1/query"

# Parameters for the query
QUERY_PARAMS = {
    "where": "(HEIGHT < 33) AND UseType = 'RESIDENTIAL' AND SitusCity IN('LOS ANGELES CA','BEVERLY HILLS CA',  'PALMDALE')",
    "outFields": "*",
    "outSR": "4326",
    "f": "json",
    "resultRecordCount": 1000,  # Fetch 1000 records per request
}

async def fetch_total_count():
    """Fetch total number of matching records."""
    params = QUERY_PARAMS.copy()
    params["returnCountOnly"] = "true"

    async with aiohttp.ClientSession() as session:
        async with session.get(BASE_URL, params=params) as response:
            data = await response.json()
            return data.get("count", 0)  # Extract total count

async def fetch(session, offset):
    """Fetch a batch of records using pagination."""
    params = QUERY_PARAMS.copy()
    params["resultOffset"] = offset

    async with session.get(BASE_URL, params=params) as response:
        return await response.json()

async def main():
    """Fetch all records asynchronously with pagination."""
    all_data = []
    total_count = await fetch_total_count()
    print(f"Total Records to Retrieve: {total_count}")

    semaphore = asyncio.Semaphore(10)  # Limit concurrency to prevent API overload

    async with aiohttp.ClientSession() as session:
        async def bound_fetch(offset):
            async with semaphore:
                data = await fetch(session, offset)
                return data

        # Generate tasks for pagination
        tasks = [bound_fetch(offset) for offset in range(0, total_count, 1000)]
        results = await asyncio.gather(*tasks)

        for data in results:
            if "features" in data:
                all_data.extend(data["features"])

    print(f"Total Records Retrieved: {len(all_data)}")
    return all_data

# Run the async function
all_data = asyncio.run(main())

I've turned to Databricks + scala to speed up the data retrieval faster. But I'm brand new to big data computing. I'm slightly aware you need to "parallize" your API calls and combine them into one big dataframe?

Can someone provide me suggestions?

Upvotes: -1

Views: 40

Answers (0)

Related Questions