Reputation: 53
I am trying to create an external table in Big Query for a Parquet file that is present on the GCS bucket. But I am running the below code in airflow getting an error:
ERROR:
ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project_dev/datasets/dataset_dev/tables?prettyPrint=false: When defining a table with an ExternalDataConfiguration, a schema must be present on either the Table or the ExternalDataConfiguration. If the schema is present on both, the schemas must be the same.
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/bigquery.py", line 1210, in execut
encryption_configuration=self.encryption_configuration
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 675, in create_external_tabl
table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=Tru
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 387, in create_empty_tabl
table=table, exists_ok=exists_ok, retry=retr
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 622, in create_tabl
timeout=timeout
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 640, in _call_ap
return call(
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
on_error=on_error
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
return target(
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 483, in api_reques
DAG CODE:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='temp_bucket',
source_objects='data/part.parquet',
destination_project_dataset_table=f"project_dev.dataset_dev.parquet_table",
file_format='PARQUET',
impersonation_chain='[email protected]',
dag=dag
)
Ideally, it should autodetect the schema, but it's not working. I also tried giving autoDetect=True but that also did not work.
Upvotes: 0
Views: 1871
Reputation: 7277
You are encountering the 400 error because parameters impersonation_chain
and file_format
are not accepted by BigQueryCreateExternalTableOperator(). I removed impersonation_chain
, changed file_format
to source_objects
, and you should pass a list for source_objects
instead of a string.
I was able to make it work using the following parameters:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='my-bucket',
source_objects=["/data/userdata1.parquet"], #pass a list
destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
source_format='PARQUET', #use source_format instead of file_format
)
For testing, I used a this sample parquet. I'm using Composer Version 1.17.2 and Airflow Version 1.10.15. See full DAG used:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'parquet_load_to_bq',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='my-bucket',
source_objects=["/data/userdata1.parquet"], #pass a list
destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
source_format='PARQUET', #use source_format instead of file_format
)
# Print the dag_run id from the Airflow logs
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
create_imp_external_table >> print_dag_run_conf
See logs of sample run:
BQ data:
Upvotes: 1