mez63
mez63

Reputation: 166

Authorisation error when running airflow via cloud composer

I get an error when trying to run DAG from cloud composer using the GoogleCloudStorageToBigQueryOperator.

Final error was: {'reason': 'invalid', 'location': 'gs://xxxxxx/xxxx.csv', and when I follow the URL link to the error ...

{
  "error": {
    "code": 401,
    "message": "Request is missing required authentication credential. Expected OAuth 2 access token, login cookie     or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-    project.",
    "errors": [
      {
        "message": "Login Required.",
        "domain": "global",
        "reason": "required",
        "location": "Authorization",
        "locationType": "header"
      }
    ],
    "status": "UNAUTHENTICATED"
  }
}

I have configured the Cloud Storage connection ...

Conn Id My_Cloud_Storage

Conn Type Google Cloud Platform

Project Id xxxxxx

Keyfile Path /home/airflow/gcs/data/xxx.json

Keyfile JSON

Scopes (comma seperated) https://www.googleapis.com/auth/cloud-platform

Code ..

from __future__ import print_function

import datetime

from airflow import models
from airflow import DAG
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2019, 4, 15),
}
with models.DAG(
        'Ian_gcs_to_BQ_Test',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    load_csv = GoogleCloudStorageToBigQueryOperator(
        task_id='gcs_to_bq_test',
        bucket='xxxxx',
        source_objects=['xxxx.csv'],
        destination_project_dataset_table='xxxx.xxxx.xxxx',
        google_cloud_storage_conn_id='My_Cloud_Storage',
        schema_fields=[
            {'name':'AAAA','type':'INTEGER','mode':'NULLABLE'},
            {'name':'BBB_NUMBER','type':'INTEGER','mode':'NULLABLE'},   
        ],
        write_disposition='WRITE_TRUNCATE',
        dag=dag)

Upvotes: 0

Views: 1189

Answers (2)

Carl24k
Carl24k

Reputation: 26

I had the exact same looking error. What fixed it for me was adding the location of my dataset to my operator. So first, check the dataset information if you are not sure the location. Then add it as a parameter in your operator. For example, my dataset was in us-west1 and I was using an operator that looked like this:

    check1 = BigQueryCheckOperator(task_id='check_my_event_data_exists',
                       sql="""
                        select count(*) > 0
                        from my_project.my_dataset.event
                         """,
                       use_legacy_sql=False,
                       location="us-west1")   # THIS WAS THE FIX IN MY CASE

GCP error messages don't seem to be very good.

Upvotes: 0

mez63
mez63

Reputation: 166

Ok , now it's fixed. Turns out it wasn't working because of the header row in the file, once I removed that it worked fine. Pretty annoying, completely misleading error messages about invalid locations and authorization.

Upvotes: 1

Related Questions