GURUDAS K S
GURUDAS K S

Reputation: 53

How to create external table in Google Big Query for Parquet file to run in Airflow Dag

I am trying to create an external table in Big Query for a Parquet file that is present on the GCS bucket. But I am running the below code in airflow getting an error:

ERROR:

ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project_dev/datasets/dataset_dev/tables?prettyPrint=false: When defining a table with an ExternalDataConfiguration, a schema must be present on either the Table or the ExternalDataConfiguration. If the schema is present on both, the schemas must be the same.
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/bigquery.py", line 1210, in execut
    encryption_configuration=self.encryption_configuration
  File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 675, in create_external_tabl
    table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=Tru
  File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 387, in create_empty_tabl
    table=table, exists_ok=exists_ok, retry=retr
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 622, in create_tabl
    timeout=timeout
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 640, in _call_ap
    return call(
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
    on_error=on_error
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
    return target(
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 483, in api_reques

DAG CODE:

create_imp_external_table = BigQueryCreateExternalTableOperator(
        task_id=f"create_imp_external_table",
        bucket='temp_bucket',
        source_objects='data/part.parquet',
        destination_project_dataset_table=f"project_dev.dataset_dev.parquet_table",
        file_format='PARQUET',
        impersonation_chain='[email protected]',
        dag=dag
    )

Ideally, it should autodetect the schema, but it's not working. I also tried giving autoDetect=True but that also did not work.

Upvotes: 0

Views: 1871

Answers (1)

Ricco D
Ricco D

Reputation: 7277

You are encountering the 400 error because parameters impersonation_chain and file_format are not accepted by BigQueryCreateExternalTableOperator(). I removed impersonation_chain, changed file_format to source_objects, and you should pass a list for source_objects instead of a string.

I was able to make it work using the following parameters:

create_imp_external_table = BigQueryCreateExternalTableOperator(
    task_id=f"create_imp_external_table",
    bucket='my-bucket',
    source_objects=["/data/userdata1.parquet"], #pass a list
    destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
    source_format='PARQUET', #use source_format instead of file_format
)

For testing, I used a this sample parquet. I'm using Composer Version 1.17.2 and Airflow Version 1.10.15. See full DAG used:

import datetime
import airflow
from airflow.operators import bash_operator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'parquet_load_to_bq',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    create_imp_external_table = BigQueryCreateExternalTableOperator(
        task_id=f"create_imp_external_table",
        bucket='my-bucket',
        source_objects=["/data/userdata1.parquet"], #pass a list
        destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
        source_format='PARQUET', #use source_format instead of file_format
    )

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

create_imp_external_table >> print_dag_run_conf

See logs of sample run:

enter image description here

BQ data:

enter image description here

Upvotes: 1

Related Questions