Musa
Musa

Reputation: 99

How do I pass parameters from python operator to batch operator?

I have the below dag which is triggered via cli with a conf parameter for job_id. I have successfully triggered the dag, and can retrieve the job_id but im not sure how to now pass this parameter to the batchoperator?

dag = DAG(
    dag_id='example_batch_submit_job',
    schedule_interval=None,
    start_date=datetime(2022, 7, 14),
    tags=['batch_job'],
    catchup=False)

def get_inputs_config(**kwargs):
    parameters = ("{}".format(kwargs['dag_run'].conf['job_id'])
    print("Remotely received value of {} for key=job_id".format(parameters))
    return parameters

run_this = PythonOperator(
    task_id='get_input',
    provide_context=True,
    python_callable=get_inputs_config,
    dag=DAG,
)

submit_batch_job = BatchOperator(
    task_id='submit_batch_job_etl',
    job_name=JOB_NAME,
    job_queue=JOB_QUEUE,
    job_definition=JOB_DEFINITION,
    parameters={} <-------- here
)

Upvotes: 0

Views: 613

Answers (1)

ozs
ozs

Reputation: 3681

You can go directly to the conf without the python function

submit_batch_job = BatchOperator(
    task_id='submit_batch_job_etl',
    job_id='{{ dag_run.conf["job_id"] }}',
    job_queue=JOB_QUEUE,
    job_definition=JOB_DEFINITION,
    parameters={ 'job_id' : '{{ dag_run.conf["job_id"] }}'} 
)

or, if you want to get it from the task xcom, you can do as follow

submit_batch_job = BatchOperator(
    task_id='submit_batch_job_etl',
    job_id='{{ dag_run.conf["job_id"] }}',
    job_queue=JOB_QUEUE,
    job_definition=JOB_DEFINITION,
    parameters={{ task_instance.xcom_pull(task_ids='get_input', key='return_value') }}
)

but in that case you need parameters to be a dict and in the dag configuration to set render_template_as_native_obj=True

Upvotes: 2

Related Questions