Reputation: 2111
I am trying to create multiple airflow tasks based on dag_run conf input. The conf would have an array of values and the each value needs to spawn a task. The task in turn needs to pass the value to its callable func. Something like this:
#create this task in a loop
task = PythonOperator(task_id="fetch_data", python_callable=fetch_data(value from array), retries=10)
Conf would have a value like:
{"fruits":["apple","kiwi","orange"]}
I think this can be accessed with:
kwargs['dag_run'].conf('fruits')
How do I access this value outside an operator and then create operators in a loop?
Upvotes: 2
Views: 4005
Reputation: 1
Had the same question. My workaround on that is using Airflow variables.
from airflow.models import Variable
foo_json = Variable.get("foo_baz", deserialize_json=True)
with DAG (...) as dag:
for x in foo_json['task_list']:
t1 = PythonOperator(
task_id=f'task_{x}', ...)
The content is a list which can be used to create dynamic amount of tasks with for loop inside a Dag. Every time I need different config for tasks I simply change it right before a Dagrun. It's not done with one command but with two to reach that functionality regardless on manual, api or CMD usage.
Upvotes: 0
Reputation: 26
I wish there was some sort of parallel for operator in airflow like kubeflow has. To solve this problem using airflow, I end up triggering another DAG/Dag Run using the TriggerDagRunOperator... it looks something like this.
from airflow.operators.dagrun_operator import TriggerDagRunOperator
def trigger_extract_dag(**kwargs):
config = kwargs['dag_run'].conf
for stuff_dict in config['stuff_to_extract']:
dag_task = TriggerDagRunOperator(
task_id='trigger-extraction-dag',
trigger_dag_id=EXTRACTION_DAG_NAME,
conf=stuff_dict
)
dag_task.execute(dict())
Upvotes: 1
Reputation: 591
You can wrap your PythonOperator instantiation in a for loop that consumes the list of values.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG(
dag_id='fruit_name_printer',
start_date=datetime(2021, 1, 1),
schedule_interval='@once'
)
input = [
'apple',
'orange',
'banana'
]
def call_func(fruit_name):
print(fruit_name)
with dag:
for fruit in input:
printer = PythonOperator(
task_id=f'print_{fruit}',
python_callable=call_func,
op_kwargs={
'fruit_name': fruit
}
)
Upvotes: 0