Tejas
Tejas

Reputation: 131

GCP Composer: Run Python Script in another GCS bucket

I'm new to Airflow, and I'm trying to run a python script that reads data from Bigquery, does some preprocessing, and exports a table back to Bigquery. This is the dag I have

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

YESTERDAY = datetime.now() - timedelta(days=1)

default_args = {
   'owner': 'me',
   'depends_on_past': False,
   'start_date': YESTERDAY,
   'email': [''],
   'email_on_failure': False,
   'email_on_retry': False,
   'retries': 0,
   'max_tries': 0,
}

with DAG(
   dag_id = 'my_code',
   default_args = default_args,
   schedule_interval = '@daily',
   catchup  = False
) as dag:

   import data = BashOperator(
          task_id = 'daily_task',
          bash_command = 'python gs://project_id/folder1/python_script.py'
         )

This gives an error of 'No such file or directory found'. I did not set up the Environment in Composer, so I'm not sure if it requires specific credentials. I tried storing the script in the dags folder, but then it wasn't able to access the bigquery tables.

I have two questions:

  1. How do I properly define the location of the python script within another GCS bucket? Should the gs location work if proper credentials are applied? Or do I necessarily have to store the scripts in a folder within the dags folder?
  2. How do I provide the proper credentials (like login ID and password) within the DAG, in case that is all that's needed to solve the issues?

I handwrote the code since the original is in a work laptop and I cannot copy. Let me know if there are any errors. Thank you!

Upvotes: 0

Views: 2115

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6582

To solve your issue, I propose you a solution which in my opinion, is easier to manage. Whenever possible it is better to use Python scripts within Composer's Bucket.

  • Copy your Python script in the Composer bucket and DAG folder with a separated process outside of Composer (gcloud) or directly in the DAG. If you want to do that in the DAG, you can check from this link
  • Use a Python operator that invokes your Python script inside the DAG
  • The Service Account used by Composer needs having the good privileges to read and write data to BigQuery. If you copy the Python scripts directly in the DAG, the SA needs to have the privileges to download file from GCS in the project 2.
from your_script import your_method_with_bq_logic
   
with airflow.DAG(
        'your_dag',
        default_args=your_args,
        schedule_interval=None) as dag:
    
    
    bq_processing = PythonOperator(
        task_id='bq_processing',
        python_callable=your_method_with_bq_logic
    )

    bq_processing

You can import the Python script main method in the code because it exists in the DAG folder.

Upvotes: 1

Related Questions