Aleksandra Niemczyk
Aleksandra Niemczyk

Reputation: 11

How to fix: the python code doesn't work through DAG airflow: pandas.read_csv('gs://x/y.csv') file doesn't exist

the code on my computer runs fine but when I put it in DAG to run through Airflow it doesn't work. I use GCP and composer. The other tasks work fine on the same cloud storage. The composer has all permissions needed.

def get_results():
    import pandas as pandas
    df = pandas.read_csv('gs://y/x.csv')

with models.DAG(
        ...) as dag:
     search_simmilar = python_operator.PythonOperator(
         task_id='search',
         python_callable=get_results
     )

Error in Airflow logs:

File "pandas/_libs/parsers.pyx", line 695, in pandas._libs.parsers.TextReader._setup_parser_sourc
FileNotFoundError: File b'gs://y/x.csv' does not exis

Upvotes: 1

Views: 3036

Answers (4)

muellerv
muellerv

Reputation: 1

  1. Create a Variable, go to Dashboard -> Admin -> Variables, create a new variable with your gc bucket. For example I use for the key: gcs_bucket and for val: gs://your_unique_key

  2. In dag file import:

import datetime
from airflow import models

output_file = os.path.join(models.Variable.get('gcs_bucket'), 'directory_name',
                           datetime.datetime.now().strftime(%Y%m%d-%H%M%S)) + '.csv'

Upvotes: 0

Salman Faris
Salman Faris

Reputation: 31

Check the pandas version installed in your composer. Pandas version 0.24.0 Added support for reading from/writing to Google Cloud Storage via the gcsfs library

NOTE: upgrading the pandas version may broke the existing operators, as the hooks would be using or depends on the older pandas version. Eg. BigQueryOperator will fail due to pandas dependency. In that case you can choose to use PythonVirtualenvOperator or KubernetesPodOperator, where you can able to install the dependant modules(like pandas >0.24.0) without affecting the existing environment.

Upvotes: 1

Tilen Kavčič
Tilen Kavčič

Reputation: 121

GCP composer uses Cloud Storage FUSE which maps your composer dag folder to the Google cloud storage bucket in which you place your DAGs (e.g.: gs://bucket-name/dags).

I advise you to place your files that are shared between dags in this folder /home/airflow/gcs/data which is mapped to gs://bucket-name/dags. Here you can read more about Google cloud storage and Composer: https://cloud.google.com/composer/docs/concepts/cloud-storage

Also here's an example:

import os
import pandas as pandas

def get_results():
    path_to_csv = os.path.join('/home/airflow/gcs/data', 'y','x.csv') 
    df = pandas.read_csv(path_to_csv, header=None)

with models.DAG(
        ...) as dag:
     search_simmilar = python_operator.PythonOperator(
         task_id='search',
         python_callable=get_results
     )

Upvotes: 1

Micah Miller
Micah Miller

Reputation: 71

I can think of 2 ways of solving this:

  • Easy way
    • put the CSV file in the dags folder along with your DAG.py file.
    • Composer automagically maps its directory structure to GCS on creation as indicated by viewing the airflow.cfg
    • access the file by using the path /home/airflow/gcs/dags/<path>/<to>/<file>.csv
  • Harder way (use an existing operator as an example)
    • create a gcs_hook
    • run GoogleCloudStorageHook.download(bucket, object)
    • (optional) save byte string as a NamedTemporaryFile
    • read this file or byte string into pandas

Upvotes: 0

Related Questions