Reputation: 99
I have the below dag which is triggered via cli with a conf parameter for job_id. I have successfully triggered the dag, and can retrieve the job_id but im not sure how to now pass this parameter to the batchoperator?
dag = DAG(
dag_id='example_batch_submit_job',
schedule_interval=None,
start_date=datetime(2022, 7, 14),
tags=['batch_job'],
catchup=False)
def get_inputs_config(**kwargs):
parameters = ("{}".format(kwargs['dag_run'].conf['job_id'])
print("Remotely received value of {} for key=job_id".format(parameters))
return parameters
run_this = PythonOperator(
task_id='get_input',
provide_context=True,
python_callable=get_inputs_config,
dag=DAG,
)
submit_batch_job = BatchOperator(
task_id='submit_batch_job_etl',
job_name=JOB_NAME,
job_queue=JOB_QUEUE,
job_definition=JOB_DEFINITION,
parameters={} <-------- here
)
Upvotes: 0
Views: 613
Reputation: 3681
You can go directly to the conf without the python function
submit_batch_job = BatchOperator(
task_id='submit_batch_job_etl',
job_id='{{ dag_run.conf["job_id"] }}',
job_queue=JOB_QUEUE,
job_definition=JOB_DEFINITION,
parameters={ 'job_id' : '{{ dag_run.conf["job_id"] }}'}
)
or, if you want to get it from the task xcom, you can do as follow
submit_batch_job = BatchOperator(
task_id='submit_batch_job_etl',
job_id='{{ dag_run.conf["job_id"] }}',
job_queue=JOB_QUEUE,
job_definition=JOB_DEFINITION,
parameters={{ task_instance.xcom_pull(task_ids='get_input', key='return_value') }}
)
but in that case you need parameters to be a dict and in the dag configuration to set render_template_as_native_obj=True
Upvotes: 2