D M
D M

Reputation: 11

unable to fetch bigquery table count using airflow bq hook operator . unable to see the count or query result when trying to print the output

Trying to run the below mentioned code to extract the total count of a table from airflow dag , in order to validate if the count is retrieved. I am trying to store the result in a variable and print the variable to see the result. But I am not able to see the query output when I print the variable. Looks like I am missing out on something. Is there a way to store the result into a variable.

def count():
        config={
                "job_type":"Query",
                 "query":{
                          "query":"select count(*) from project_id.dataset.table",
                          "useLegacySql":False,
                          "allow_large_results":True,
                          "location":"location_name"
               }}
hook = BigQueryHook(gcp_conn_id='connect_id')
target_count=hook.insert_job(configuration=config, nowait=False)
print(target_count)
    
with models.Dag(
        'count_test',
        start_date = datetime(20, 9, 1),
        schedule_interval = None, 
        catchup = False,
        ) as dag:
    
    from airflow.operators.python_operator import PythonOperator
    count_bqHook = PythonOperator(
                                   task_id='task1',
                                   python_callable=count
                                 )
    
chain( count_bqHook )

Upvotes: 0

Views: 1085

Answers (1)

Scott B
Scott B

Reputation: 2994

You may try and consider below approach:

On your initial code, target_count=hook.insert_job(configuration=config, nowait=False) will store a QueryJob object. And per this QueryJob documentation, you may navigate to result() for you to return the actual query result and then print it in your Cloud Composer Airflow logs.

Below is the code I used:

def count():
    config={
        "job_type":"Query",
        "query":{
            "query":"select count(*) as total_count from project_id.dataset.table",
            "useLegacySql":False,
            "allow_large_results":True,
            "location":"location_name"
        }}
    hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
    target_count=hook.insert_job(configuration=config, project_id=PROJECT_NAME, nowait=False)
    for row in target_count.result():
        print(f'PRINTING TOTAL_COUNT --> {row.total_count}')

with models.DAG(
        'count_test',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
        catchup = False,
        ) as dag:
    from airflow.operators.python_operator import PythonOperator
    count_bqHook = PythonOperator(
        task_id='task1',
        python_callable=count
    )
    
    count_bqHook

To properly extract and print your needed value, you may refer to this Display BQ Query Result documentation.

My Cloud Composer Airflow logs output:

enter image description here

My query executed in BQ to validate the output on Cloud Composer Airflow logs versus the actual query result in BQ: enter image description here

Upvotes: 1

Related Questions