Reputation: 1
I'm migrating to Composer version 2.4.1 and AIrflow 2.5.3. Some DAGS that I use the BigQueryCreateExternalTableOperator are returning an error when creating the external table, I am passing the file type in the source_format parameter according to the official documentation. This function which is responsible for creating the external table in Bigquery.
def operator_bq_create_external_table(self
, task_name: str
, bq_project_dataset_table_destination: str
, bucket: str
, prefix: str
, format: str
, schema: list = []
, autodetect: bool = True
, skip_rows: int = 0
, delimiter: str = ',') -> BigQueryCreateExternalTableOperator:
project, _, __ = self.split_project_dataset_table(bq_project_dataset_table_destination)
return BigQueryCreateExternalTableOperator(
task_id=f"create_external_table_{task_name}",
bucket=bucket,
destination_project_dataset_table='_'.join([
bq_project_dataset_table_destination,
"external",
self.get_logical_date_template('%Y%m%d%H')
]),
source_objects=[
f"{self.get_logical_date_template(prefix)}*.{format.lower()}"
],
schema_fields=schema,
autodetect=True,
source_format=format.lower(),
skip_leading_rows=skip_rows if format.upper() == 'CSV' else 0,
field_delimiter=delimiter if format.upper() == 'CSV' else None
)
I tested it without passing the source_format parameter, but the table was created without schema and Bigquery automatically assigned the CSV format to this table created from .PARQUET.
Error - 1
[2023-08-25, 19:34:08 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/bigquery.py", line 1751, in execute table = bq_hook.create_empty_table( File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 467, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 420, in create_empty_table return self.get_client(project_id=project_id, location=location).create_table( File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 782, in create_table api_response = self._call_api( File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 816, in _call_api return call() File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 349, in retry_wrapped_func return retry_target( File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 191, in retry_target return target() File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/_http/init.py", line 494, in api_request raise exceptions.from_http_response(response) google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project-id/datasets/airflow_hubspot_raw/tables?prettyPrint=false: Invalid value for sourceFormat: parquet is not a valid value [2023-08-25, 19:34:08 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=hubspot_api_to_bq, task_id=create_external_table_engagements_notes, execution_date=20230825T180000, start_date=20230825T193407, end_date=20230825T193408 [2023-08-25, 19:34:08 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 1441 for task create_external_table_engagements_notes (400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project-id/datasets/airflow_hubspot_raw/tables?prettyPrint=false: Invalid value for sourceFormat: parquet is not a valid value; 86938) [2023-08-25, 19:34:08 UTC] {local_task_job.py:212} INFO - Task exited with return code 1 [2023-08-25, 19:34:08 UTC] {taskinstance.py:2599} INFO - 0 downstream tasks scheduled from follow-on schedule check
Error 2
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "/home/airflow/gcs/plugins/zendesk/op_default_incremental_load.py", line 71, in execute if self.has_register(project_dataset_table_name=self.bq_normal_table): File "/home/airflow/gcs/plugins/zendesk/op_default_incremental_load.py", line 122, in has_register df = hook.get_pandas_df(sql=query, dialect="standard") File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 269, in get_pandas_df return read_gbq( File "/opt/python3.8/lib/python3.8/site-packages/pandas_gbq/gbq.py", line 943, in read_gbq final_df = connector.run_query( File "/opt/python3.8/lib/python3.8/site-packages/pandas_gbq/gbq.py", line 529, in run_query self.process_http_error(ex) File "/opt/python3.8/lib/python3.8/site-packages/pandas_gbq/gbq.py", line 396, in process_http_error raise GenericGBQException("Reason: {0}".format(ex)) from ex pandas_gbq.exceptions.GenericGBQException: Reason: 400 Table project-id.airflow_zendesk_raw.tb_custom_roles_external_2023082511 does not have a schema.
[2023-08-25, 15:37:11 UTC] {taskinstance.py:1328} INFO - Marking task as FAILED. dag_id=zendesk_incremental_api_to_bq, task_id=load_tb_custom_roles, execution_date=20230825T140000, start_date=20230825T153710, end_date=20230825T153711 [2023-08-25, 15:37:12 UTC] {logging_mixin.py:137} INFO - ({'content-type': 'application/json; charset=UTF-8', 'vary': 'Origin, X-Origin, Referer', 'date': 'Fri, 25 Aug 2023 15:37:12 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '200', 'content-length': '3336', '-content-encoding': 'gzip'}, b'{\n "name": "spaces/AAAA3vTV6Fs/messages/Jvo4k4Qeo_w.Jvo4k4Qeo_w",\n "sender": {\n "name": "users/114022495153014004089",\n "displayName": "Airflow Prod",\n "type": "BOT"\n },\n "createTime": "2023-08-25T15:37:11.988511Z",\n "text": "Airflow DAG Error - [\u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_macros manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_views manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_groups manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_group_memberships manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_events manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_organizations manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_metric_events manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_forms manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_fields manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e]",\n "thread": {\n "name": "spaces/AAAA3vTV6Fs/threads/Jvo4k4Qeo_w",\n "retentionSettings": {\n "state": "PERMANENT"\n }\n },\n "space": {\n "name": "spaces/AAAA3vTV6Fs",\n "type": "ROOM",\n "displayName": "dados-engenharia-notifica\xc3\xa7\xc3\xb5es",\n "spaceThreadingState": "THREADED_MESSAGES",\n "spaceType": "SPACE",\n "spaceHistoryState": "HISTORY_ON"\n },\n "argumentText": "Airflow DAG Error - [\u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_macros manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_views manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_groups manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_group_memberships manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_events manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_organizations manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_metric_events manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_forms manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e, \u003cTaskInstance: zendesk_incremental_api_to_bq.create_external_table_tb_ticket_fields manual__2023-08-25T14:00:00.349029+00:00 [failed]\u003e]",\n "retentionSettings": {\n "state": "PERMANENT"\n }\n}\n') [2023-08-25, 15:37:12 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 1244 for task load_tb_custom_roles (Reason: 400 Table project-id.airflow_zendesk_raw.tb_custom_roles_external_2023082511 does not have a schema.
Upvotes: 0
Views: 177
Reputation: 1
I solved it by replacing BigQueryCreateExternalTable operator with GoogleCloudStorageToBigQueryOperator, https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/gcs_to_bq/index.html
Upvotes: 0