Reputation: 166
I get an error when trying to run DAG from cloud composer using the GoogleCloudStorageToBigQueryOperator.
Final error was: {'reason': 'invalid', 'location': 'gs://xxxxxx/xxxx.csv', and when I follow the URL link to the error ...
{
"error": {
"code": 401,
"message": "Request is missing required authentication credential. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole- project.",
"errors": [
{
"message": "Login Required.",
"domain": "global",
"reason": "required",
"location": "Authorization",
"locationType": "header"
}
],
"status": "UNAUTHENTICATED"
}
}
I have configured the Cloud Storage connection ...
Conn Id My_Cloud_Storage
Conn Type Google Cloud Platform
Project Id xxxxxx
Keyfile Path /home/airflow/gcs/data/xxx.json
Keyfile JSON
Scopes (comma seperated) https://www.googleapis.com/auth/cloud-platform
Code ..
from __future__ import print_function
import datetime
from airflow import models
from airflow import DAG
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2019, 4, 15),
}
with models.DAG(
'Ian_gcs_to_BQ_Test',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
load_csv = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_test',
bucket='xxxxx',
source_objects=['xxxx.csv'],
destination_project_dataset_table='xxxx.xxxx.xxxx',
google_cloud_storage_conn_id='My_Cloud_Storage',
schema_fields=[
{'name':'AAAA','type':'INTEGER','mode':'NULLABLE'},
{'name':'BBB_NUMBER','type':'INTEGER','mode':'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag)
Upvotes: 0
Views: 1189
Reputation: 26
I had the exact same looking error. What fixed it for me was adding the location of my dataset to my operator. So first, check the dataset information if you are not sure the location. Then add it as a parameter in your operator. For example, my dataset was in us-west1
and I was using an operator that looked like this:
check1 = BigQueryCheckOperator(task_id='check_my_event_data_exists',
sql="""
select count(*) > 0
from my_project.my_dataset.event
""",
use_legacy_sql=False,
location="us-west1") # THIS WAS THE FIX IN MY CASE
GCP error messages don't seem to be very good.
Upvotes: 0
Reputation: 166
Ok , now it's fixed. Turns out it wasn't working because of the header row in the file, once I removed that it worked fine. Pretty annoying, completely misleading error messages about invalid locations and authorization.
Upvotes: 1