Reputation: 131
I'm new to Airflow, and I'm trying to run a python script that reads data from Bigquery, does some preprocessing, and exports a table back to Bigquery. This is the dag I have
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
YESTERDAY = datetime.now() - timedelta(days=1)
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': YESTERDAY,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'max_tries': 0,
}
with DAG(
dag_id = 'my_code',
default_args = default_args,
schedule_interval = '@daily',
catchup = False
) as dag:
import data = BashOperator(
task_id = 'daily_task',
bash_command = 'python gs://project_id/folder1/python_script.py'
)
This gives an error of 'No such file or directory found'. I did not set up the Environment in Composer, so I'm not sure if it requires specific credentials. I tried storing the script in the dags folder, but then it wasn't able to access the bigquery tables.
I have two questions:
I handwrote the code since the original is in a work laptop and I cannot copy. Let me know if there are any errors. Thank you!
Upvotes: 0
Views: 2115
Reputation: 6582
To solve your issue, I propose you a solution which in my opinion, is easier to manage. Whenever possible it is better to use Python scripts within Composer's Bucket.
Python
script in the Composer
bucket and DAG
folder with a separated process outside of Composer (gcloud) or directly in the DAG. If you want to do that in the DAG
, you can check from this linkPython
operator that invokes your Python
script inside the DAG
Service Account
used by Composer
needs having the good privileges to read and write data to BigQuery
. If you copy the Python
scripts directly in the DAG
, the SA needs to have the privileges to download file from GCS
in the project 2.from your_script import your_method_with_bq_logic
with airflow.DAG(
'your_dag',
default_args=your_args,
schedule_interval=None) as dag:
bq_processing = PythonOperator(
task_id='bq_processing',
python_callable=your_method_with_bq_logic
)
bq_processing
You can import the Python
script main method in the code because it exists in the DAG
folder.
Upvotes: 1