Reputation: 25
I have a problem with the BigQueryCreateExternalTableOperator's autodetect argument on Google Cloud Composer.
I use the Operator the following way:
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table="<bq_dataset>.<bq_table>",
bucket="data_exchange_bucket",
autodetect=True,
source_objects=["<filename>__part*.csv"],
)
When importing the dag, cloud composer shows the following error:
I checked the Operators source code here and did find the "autodetect" argument in there. I tried positioning the argument at a different position, although that should not make a difference right?
Thanks a lot for any support.
Upvotes: 0
Views: 523
Reputation: 15979
The error means that you are running older version of the google provider.
Specifically you are running apache-airflow-providers-google<6.8.0
.
The autodetect
parameter for BigQueryCreateExternalTableOperator
was released in version 6.8.0 so you need to bump the provider apache-airflow-providers-google>=6.8.0
to get the new functionality.
Should upgrading is not an option for you then you can create a custom operator with the missing functionality by backporting the code in PR
I didn't test it but probably something like:
class MyBigQueryCreateExternalTableOperator(BigQueryCreateExternalTableOperator):
def __init__(
self,
*,
autodetect: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.autodetect = autodetect
def execute(self, context: 'Context') -> None:
bq_hook = BigQueryHook(
gcp_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
if self.table_resource:
bq_hook.create_empty_table(
table_resource=self.table_resource,
)
return
if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GCSHook(
gcp_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
schema_fields = json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8"))
else:
schema_fields = self.schema_fields
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
autodetect=self.autodetect,
compression=self.compression,
skip_leading_rows=self.skip_leading_rows,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels,
encryption_configuration=self.encryption_configuration,
)
Then you can use MyBigQueryCreateExternalTableOperator
in your DAGs.
Upvotes: 1