MMMdata
MMMdata

Reputation: 867

Run Multiple BigQuery Jobs via Python API

I've been working off of Google Cloud Platform's Python API library. I've had much success with these API samples out-of-the-box, but I'd like to streamline it a bit further by combining the three queries I need to run (and subsequent tables that will be created) into a single file. Although the documentation mentions being able to run multiple jobs asynchronously, I've been having trouble figuring out the best way to accomplish that.

Thanks in advance!

Upvotes: 1

Views: 4601

Answers (2)

Philippe Hebert
Philippe Hebert

Reputation: 2028

BigQuery jobs are always async by default; this being said, requesting the result of the operation isn't. As of Q4 2021, the Python API does not support a proper async way to collect results. Each call to job.result() blocks the thread, hence making it impossible to use with a single threaded event loop like asyncio. Thus, the best way to collect multiple job results is by using multithreading:

from typing import Dict
from concurrent.futures import ThreadPoolExecutor
from google.cloud import bigquery

client: bigquery.Client = bigquery.Client()

def run(name, statement):
    return name, client.query(statement).result() # blocks the thread

def run_all(statements: Dict[str, str]):
    with ThreadPoolExecutor() as executor:
        jobs = []
        for name, statement in statements.items():
            jobs.append(executor.submit(run, name, statement))
        result = dict([job.result() for job in jobs])
    return result

P.S.: Some credits are due to @Fredrik Håård for this answer :)

Upvotes: 2

Mikhail Berlyant
Mikhail Berlyant

Reputation: 172994

The idea of running multiple jobs asynchronously is in creating/preparing as many jobs as you need and kick them off using jobs.insert API (important you should either collect all respective jobids or set you own - they just need to be unique). Those API returns immediately, so you can kick them all off "very quickly" in one loop

Meantime, you need to check repeatedly for status of those jobs (in loop) and as soon as job is done you can kick processing of result as needed

You can check for details in Running asynchronous queries

Upvotes: 2

Related Questions