Reputation: 41
I am trying to run a DAG from REST API and pass some parameters to it. The DAG should be able to catch the parameters and use it. The problem is I am able to trigger the DAG from REST API,but the DAG is not able to catch the parameters passed. Is there a way to achieve this?
I am triggering the DAG from REST API as below.It passes the parameters in --conf
http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2
How to capture the values passed in conf value in the called DAG. As far as I know the conf should take the URL encoded JSON format data.
DAG code:`
def run_this_func(**kwargs):
print(kwargs)
run_this = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag
)`
Upvotes: 4
Views: 7543
Reputation: 683
This one took some digging as I wanted to use the conf to create a dynamic workflow, not just to be used in other operators. To use the conf parameters in the overall DAG:
args = {"owner": "me", "depends_on_past": False}
with DAG(
dag_id="my_dag",
default_args=args
schedule_interval="@daily",
start_date=dats_ago(1)
) as dag
conf = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
So if the arguments you passed in the conf were
{"table_names": ["table1", "table2", "table3"]}
You can access the table names list by doing
table_names = conf["tables_names"]
Upvotes: 1
Reputation: 87
The external parameters passed are part of the dag_run
object. They can be accessed as follows:
API Request
import requests
headers = {
'Cache-Control': 'no-cache',
'Content-Type': 'application/json',
}
data = '{"conf":"{\\"Key1\\":\\"Value1\\"}"}'
response = requests.post('http://localhost:8080/api/experimental/dags/<dag_id>/dag_runs', headers=headers, data=data)
DAG
def run_this_func(**context):
print("Received {} for key=message".format(context["dag_run"].conf))
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, provide_context=True, dag=dag)
Upvotes: 3
Reputation: 1422
I did not know that you could trigger a DAG with HTTP GET, but I've successfully triggered with conf using POST and following the documentation https://airflow.apache.org/api.html
For example triggering the dag "trigger_test_dag":
curl -X POST --data '"conf":"{\"key\":\"value\"}"' \
"http://abcairflow.com:8090/api/experimental/dags/trigger_test_dag/dag_runs"
Pay attention to the escaping of apostrophes as conf needs to be a string. I guess you can do a base 64 encode, and then decode in the DAG, to the string if you prefer that.
Upvotes: 2
Reputation: 6538
Unfortunately, this is not a well-documented feature, but there are examples of a DAG triggering another DAG with the conf
set and the target DAG using it. See example_trigger_controller_dag and example_trigger_target_dag. DAGs triggered by an operator, REST API, or CLI should all pass the conf
parameter in the same way.
conf
is accessible inside the context, so you'll need to make sure you pass provide_context=True
when using a PythonOperator
.
def run_this_func(**kwargs):
print(kwargs['conf'])
run_this = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag,
provide_context=True,
)
Upvotes: 1