BenP
BenP

Reputation: 845

How to pass op_kwargs to Airflow TaskFlow DAG and Task?

Say I have a simple TaskFlow style DAG.

from datetime import datetime
from airflow.decorators import dag, task
from typing import Dict

@dag(   start_date=datetime.now(),
        schedule_interval='@once',
        catchup=False)
def example_taskflow_api():
    
    @task()
    def extract(value=666) -> Dict[str, int]:
        order_data= {"1001": 301.27, "1002": 433.21, "1003": value}
        return order_data
    
   extract()
my_dag = example_taskflow_api()

In the "old" style I might pass some kwarg values, or via the airflow ui, to the operator such as:

t1 = PythonVirtualenvOperator(
    task_id='extract',
    python_callable=extract,
    op_kwargs={"value":777},
    dag=dag,
    )

But I cannot find any reference in the tutorial or docs for how to achieve a similar result using TaskFlow style. The last section of the tutorial mentions Airflow context arguments, but not optionals.

Thank you

Upvotes: 0

Views: 2756

Answers (1)

Hussein Awala
Hussein Awala

Reputation: 5100

You can pass the arguments like any other python method:

extract(value=777)

If you want to pass a dict kwargs:

extract(**{"value": 777})

If you want to pass a variable from run config, you have to options:

# option 1: provide as argument
extract("{{ dag_run.conf.get('value') }}") # it will be string
# option 2
@task()
def extract(**context) -> Dict[str, int]:
    order_data= {"1001": 301.27, "1002": 433.21, "1003": context["dag_run"].conf.get('value')} # you load the variable in its initial type
    return order_data

Upvotes: 3

Related Questions