Reputation: 2236
I'm trying to rendered correctly data
inside a SimpleHttpOperator
in Airflow with configuration that I send via dag_run
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
data=json.dumps({
'url': '{{ dag_run.conf["url"] }}',
'fileType': '{{ dag_run.conf["fileType"] }}',
}),
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
Issue is that the rendered data appears to be like this
{"url": "{{ dag_run.conf[\"url\"] }}", "fileType": "{{ dag_run.conf[\"fileType\"] }}"}
I'm not sure what I'm doing wrong here.
Edit Full code below
default_args = {
'owner': 'airflow',
'start_date': days_ago(0),
}
def print_result(**kwargs):
ti = kwargs['ti']
pulled_value_1 = ti.xcom_pull(task_ids='schema_detector')
pprint.pprint(pulled_value_1)
with DAG(
dag_id='airflow_http_operator',
default_args=default_args,
catchup=False,
schedule_interval="@once",
tags=['http']
) as dag:
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
headers={"Content-Type": "application/json"},
data=json.dumps({
'url': '{{ dag_run.conf["url"] }}',
'fileType': '{{ dag_run.conf["fileType"] }}',
}),
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
pull = PythonOperator(
task_id='print_result',
python_callable=print_result,
)
result >> pull
Upvotes: 3
Views: 4358
Reputation: 435
I struggled a lot due to the same error. So, I created my own Operator (called as ExtendedHttpOperator) which is a combination of PythonOperator and SimpleHttpOperator. This worked for me :)
This operator receives a function where we can collect data passed from the API (using dag_run.conf), and parse it (if required) before passing it to an API.
from plugins.operators.extended_http_operator import ExtendedHttpOperator
testing_extend = ExtendedHttpOperator(
task_id="process_user_ids",
http_conn_id="user_api",
endpoint="/kafka",
headers={"Content-Type": "application/json"},
data_fn=passing_data,
op_kwargs={"api": "kafka"},
method="POST",
log_response=True,
response_check=lambda response: True
if validate_response(response) is True
else False,
)
def passing_data(**context):
api = context["api"]
dag_run_conf = context["dag_run"].conf
return json.dumps(dag_run_conf[api])
def validate_response(res):
if res.status_code == 200:
return True
else:
return False
Here is how you can add ExtendedHttpOperator to your airflow:
Put extended_http_operator.py file inside your_airflow_project/plugins/operators folder
# extended_http_operator.py file
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing import Optional, Dict
"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner
code rather than writing one long template line to formulate the json data.
"""
class ExtendedHttpOperator(SimpleHttpOperator):
@apply_defaults
def __init__(
self,
data_fn,
log_response: bool = False,
op_kwargs: Optional[Dict] = None,
*args,
**kwargs
):
super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
if not callable(data_fn):
raise AirflowException("`data_fn` param must be callable")
self.data_fn = data_fn
self.context = None
self.op_kwargs = op_kwargs or {}
self.log_response = log_response
def execute(self, context):
context.update(self.op_kwargs)
self.context = context
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
data_result = self.execute_callable(context)
self.log.info("Calling HTTP method")
self.log.info("Post Data: {}".format(data_result))
response = http.run(
self.endpoint, data_result, self.headers, self.extra_options
)
if self.log_response:
self.log.info(response.text)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Invalid parameters")
def execute_callable(self, context):
return self.data_fn(**context)
Dont forget to create empty __init__.py
files inside plugins
and plugins/operators
folders.
Upvotes: 2
Reputation: 2236
I couldn't find a solution. Only way I could do this, like passing information that I send over the --conf to the operator was adding a new PythonOperator that collect the info, and using then XCom on my SimpleHTTPOperator
Code
def generate_data(**kwargs):
confs = kwargs['dag_run'].conf
logging.info(confs)
return {'url': confs["url"], 'fileType': confs["fileType"]}
with DAG(
dag_id='airflow_http_operator',
default_args=default_args,
catchup=False,
schedule_interval="@once",
tags=['http']
) as dag:
generate_dict = PythonOperator(
task_id='generate_dict',
python_callable=generate_data,
provide_context=True
)
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
headers={"Content-Type": "application/json"},
data="{{ task_instance.xcom_pull(task_ids='generate_dict') |tojson}}",
log_response=True,
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
Upvotes: 0