Rob
Rob

Reputation: 25

Autodetect argument not found for BigQueryCreateExternalTableOperator

I have a problem with the BigQueryCreateExternalTableOperator's autodetect argument on Google Cloud Composer.

I use the Operator the following way:

create_external_table = BigQueryCreateExternalTableOperator(
        task_id="create_external_table",
        destination_project_dataset_table="<bq_dataset>.<bq_table>",
        bucket="data_exchange_bucket",
        autodetect=True,
        source_objects=["<filename>__part*.csv"],
    )

When importing the dag, cloud composer shows the following error: DAG error

I checked the Operators source code here and did find the "autodetect" argument in there. I tried positioning the argument at a different position, although that should not make a difference right?

Thanks a lot for any support.

Upvotes: 0

Views: 523

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15979

The error means that you are running older version of the google provider. Specifically you are running apache-airflow-providers-google<6.8.0.

The autodetect parameter for BigQueryCreateExternalTableOperator was released in version 6.8.0 so you need to bump the provider apache-airflow-providers-google>=6.8.0 to get the new functionality.

Should upgrading is not an option for you then you can create a custom operator with the missing functionality by backporting the code in PR

I didn't test it but probably something like:

class MyBigQueryCreateExternalTableOperator(BigQueryCreateExternalTableOperator):
    def __init__(
        self,
        *,
        autodetect: bool = False,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)

        self.autodetect = autodetect

    def execute(self, context: 'Context') -> None:
        bq_hook = BigQueryHook(
            gcp_conn_id=self.bigquery_conn_id,
            delegate_to=self.delegate_to,
            location=self.location,
            impersonation_chain=self.impersonation_chain,
        )
        if self.table_resource:
            bq_hook.create_empty_table(
                table_resource=self.table_resource,
            )
            return
        if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
            gcs_hook = GCSHook(
                gcp_conn_id=self.google_cloud_storage_conn_id,
                delegate_to=self.delegate_to,
                impersonation_chain=self.impersonation_chain,
            )
            schema_fields = json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8"))
        else:
            schema_fields = self.schema_fields
        source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
        bq_hook.create_external_table(
            external_project_dataset_table=self.destination_project_dataset_table,
            schema_fields=schema_fields,
            source_uris=source_uris,
            source_format=self.source_format,
            autodetect=self.autodetect,
            compression=self.compression,
            skip_leading_rows=self.skip_leading_rows,
            field_delimiter=self.field_delimiter,
            max_bad_records=self.max_bad_records,
            quote_character=self.quote_character,
            allow_quoted_newlines=self.allow_quoted_newlines,
            allow_jagged_rows=self.allow_jagged_rows,
            src_fmt_configs=self.src_fmt_configs,
            labels=self.labels,
            encryption_configuration=self.encryption_configuration,
        )

Then you can use MyBigQueryCreateExternalTableOperator in your DAGs.

Upvotes: 1

Related Questions