Reputation: 845
Say I have a simple TaskFlow style DAG.
from datetime import datetime
from airflow.decorators import dag, task
from typing import Dict
@dag( start_date=datetime.now(),
schedule_interval='@once',
catchup=False)
def example_taskflow_api():
@task()
def extract(value=666) -> Dict[str, int]:
order_data= {"1001": 301.27, "1002": 433.21, "1003": value}
return order_data
extract()
my_dag = example_taskflow_api()
In the "old" style I might pass some kwarg values, or via the airflow ui, to the operator such as:
t1 = PythonVirtualenvOperator(
task_id='extract',
python_callable=extract,
op_kwargs={"value":777},
dag=dag,
)
But I cannot find any reference in the tutorial or docs for how to achieve a similar result using TaskFlow style. The last section of the tutorial mentions Airflow context arguments, but not optionals.
Thank you
Upvotes: 0
Views: 2756
Reputation: 5100
You can pass the arguments like any other python method:
extract(value=777)
If you want to pass a dict kwargs:
extract(**{"value": 777})
If you want to pass a variable from run config, you have to options:
# option 1: provide as argument
extract("{{ dag_run.conf.get('value') }}") # it will be string
# option 2
@task()
def extract(**context) -> Dict[str, int]:
order_data= {"1001": 301.27, "1002": 433.21, "1003": context["dag_run"].conf.get('value')} # you load the variable in its initial type
return order_data
Upvotes: 3