Reputation: 11
the code on my computer runs fine but when I put it in DAG to run through Airflow it doesn't work. I use GCP and composer. The other tasks work fine on the same cloud storage. The composer has all permissions needed.
def get_results():
import pandas as pandas
df = pandas.read_csv('gs://y/x.csv')
with models.DAG(
...) as dag:
search_simmilar = python_operator.PythonOperator(
task_id='search',
python_callable=get_results
)
Error in Airflow logs:
File "pandas/_libs/parsers.pyx", line 695, in pandas._libs.parsers.TextReader._setup_parser_sourc
FileNotFoundError: File b'gs://y/x.csv' does not exis
Upvotes: 1
Views: 3036
Reputation: 1
Create a Variable, go to Dashboard -> Admin -> Variables, create a new variable with your gc bucket. For example I use for the key: gcs_bucket and for val: gs://your_unique_key
In dag file import:
import datetime
from airflow import models
output_file = os.path.join(models.Variable.get('gcs_bucket'), 'directory_name',
datetime.datetime.now().strftime(%Y%m%d-%H%M%S)) + '.csv'
Upvotes: 0
Reputation: 31
Check the pandas version installed in your composer. Pandas version 0.24.0 Added support for reading from/writing to Google Cloud Storage via the gcsfs library
NOTE: upgrading the pandas version may broke the existing operators, as the hooks would be using or depends on the older pandas version. Eg. BigQueryOperator will fail due to pandas dependency. In that case you can choose to use PythonVirtualenvOperator or KubernetesPodOperator, where you can able to install the dependant modules(like pandas >0.24.0) without affecting the existing environment.
Upvotes: 1
Reputation: 121
GCP composer uses Cloud Storage FUSE which maps your composer dag folder to the Google cloud storage
bucket in which you place your DAGs (e.g.: gs://bucket-name/dags
).
I advise you to place your files that are shared between dags in this folder /home/airflow/gcs/data
which is mapped to gs://bucket-name/dags
. Here you can read more about Google cloud storage
and Composer
: https://cloud.google.com/composer/docs/concepts/cloud-storage
Also here's an example:
import os
import pandas as pandas
def get_results():
path_to_csv = os.path.join('/home/airflow/gcs/data', 'y','x.csv')
df = pandas.read_csv(path_to_csv, header=None)
with models.DAG(
...) as dag:
search_simmilar = python_operator.PythonOperator(
task_id='search',
python_callable=get_results
)
Upvotes: 1
Reputation: 71
I can think of 2 ways of solving this:
dags
folder along with your DAG.py
file. airflow.cfg
DAGs folder
in GCP Console Composer/home/airflow/gcs/dags/<path>/<to>/<file>.csv
GoogleCloudStorageHook.download(bucket, object)
NamedTemporaryFile
Upvotes: 0