sudeepgupta90
sudeepgupta90

Reputation: 795

How to pass dynamic arguments Airflow operator?

I am using Airflow to run Spark jobs on Google Cloud Composer. I need to

With the Airflow API - I can read YAML files, and push variables across tasks using xcom.

But, consider the DataprocClusterCreateOperator()

and a few other arguments are marked as templated.

What if I want to pass in other arguments as templated (which are currently not so)? - like image_version, num_workers, worker_machine_type etc?

Is there any workaround for this?

Upvotes: 4

Views: 6704

Answers (1)

AC at CA
AC at CA

Reputation: 735

Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments. just simply create a params dictionary then pass to default_args:

CONFIGFILE = os.path.join(
    os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
    CFG = yaml.load(ymlfile)

default_args = {
    'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
    'project_id': CFG['section_A']['project_id'],
    'zone': CFG['section_A']['zone'],
    'mage_version': CFG['section_A']['image_version'],
    'num_workers': CFG['section_A']['num_workers'],
    'worker_machine_type': CFG['section_A']['worker_machine_type'],
    # you can add all needs params here.
}

DAG = DAG(
    dag_id=DAG_NAME,
    schedule_interval=SCHEDULE_INTEVAL,
    default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
    task_id='your_task_id',
    dag=DAG
)

But if you want dynamic dags rather than arguments, you may need other strategy like this.

So you probably need to figure out the basic idea: In which level the dynamics is? Task level? DAG level?

Or you can create your own Operator to do the job and take the parameters.

Upvotes: 7

Related Questions