Reputation: 39
I have a case where I need to pick the SQL file dynamically based on the given condition and need to pass it to BigqueryOperator to execute in DAG.
# airflow variable
SQL1 = 'SQL1.sql'
SQL2 = 'SQL2.sql'
SQL_to_run = ''
def condition_check():
if weekday = 'Sunday':
SQL_to_run = 'SQL1.sql'
elif weekday = 'Monday':
SQL_to_run = 'SQL2.sql'
else:
print('no run')
with DAG(dag_id="DAG_NM",
start_date=datetime(2024,2,21),
template_searchpath = ['/usr/local/airflow/dags/resources/'],
schedule_interval="0 10 * * 7#1,7#2",
default_args=default_args,
catchup=False) as dag:
type_of_sql_task = PythonOperator(
task_id="type_of_extract",
python_callable=condition_check,
provide_context=True,
)
execute_sql_data_task = BigQueryOperator(
task_id="Execute_data",
sql=SQL_to_run
params={'projectid' : project_name,'dataset': dataset},
use_legacy_sql=False
)
SQL_to_run is not recogonized in Bigquery Operator, I tried to do the xcom push / pull as well, but not able to achieve the requirement.
Can someone please let me know how to get the SQL file name dynamically / assign it to a variable and use it in Bigquery Operator based on condition?
Upvotes: 0
Views: 34