Tizianoreica
Tizianoreica

Reputation: 2236

SimpleHttpOperator Airflow, data templated

I'm trying to rendered correctly data inside a SimpleHttpOperator in Airflow with configuration that I send via dag_run

result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        data=json.dumps({
            'url': '{{ dag_run.conf["url"] }}',
            'fileType': '{{ dag_run.conf["fileType"] }}',
        }),
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

Issue is that the rendered data appears to be like this

{"url": "{{ dag_run.conf[\"url\"] }}", "fileType": "{{ dag_run.conf[\"fileType\"] }}"}

I'm not sure what I'm doing wrong here.

Edit Full code below

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(0),
}


def print_result(**kwargs):
    ti = kwargs['ti']
    pulled_value_1 = ti.xcom_pull(task_ids='schema_detector')
    pprint.pprint(pulled_value_1)


with DAG(
    dag_id='airflow_http_operator',
    default_args=default_args,
    catchup=False,
    schedule_interval="@once",
    tags=['http']
) as dag:

    result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        headers={"Content-Type": "application/json"},
        data=json.dumps({
            'url': '{{ dag_run.conf["url"] }}',
            'fileType': '{{ dag_run.conf["fileType"] }}',
        }),
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

    pull = PythonOperator(
        task_id='print_result',
        python_callable=print_result,
    )
    result >> pull

Upvotes: 3

Views: 4358

Answers (2)

Vinit Bodhwani
Vinit Bodhwani

Reputation: 435

I struggled a lot due to the same error. So, I created my own Operator (called as ExtendedHttpOperator) which is a combination of PythonOperator and SimpleHttpOperator. This worked for me :)

This operator receives a function where we can collect data passed from the API (using dag_run.conf), and parse it (if required) before passing it to an API.

from plugins.operators.extended_http_operator import ExtendedHttpOperator

testing_extend = ExtendedHttpOperator(
        task_id="process_user_ids",
        http_conn_id="user_api",
        endpoint="/kafka",
        headers={"Content-Type": "application/json"},
        data_fn=passing_data,
        op_kwargs={"api": "kafka"},
        method="POST",
        log_response=True,
        response_check=lambda response: True
        if validate_response(response) is True
        else False,
    )

def passing_data(**context):
    api = context["api"]
    dag_run_conf = context["dag_run"].conf
    return json.dumps(dag_run_conf[api])

def validate_response(res):
    if res.status_code == 200:
        return True
    else:
        return False

Here is how you can add ExtendedHttpOperator to your airflow:

Put extended_http_operator.py file inside your_airflow_project/plugins/operators folder

# extended_http_operator.py file

from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing import Optional, Dict

"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner 
code rather than writing one long template line to formulate the json data.
"""


class ExtendedHttpOperator(SimpleHttpOperator):
    @apply_defaults
    def __init__(
        self,
        data_fn,
        log_response: bool = False,
        op_kwargs: Optional[Dict] = None,
        *args,
        **kwargs
    ):
        super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
        if not callable(data_fn):
            raise AirflowException("`data_fn` param must be callable")
        self.data_fn = data_fn
        self.context = None
        self.op_kwargs = op_kwargs or {}
        self.log_response = log_response

    def execute(self, context):
        context.update(self.op_kwargs)
        self.context = context
        http = HttpHook(self.method, http_conn_id=self.http_conn_id)

        data_result = self.execute_callable(context)

        self.log.info("Calling HTTP method")
        self.log.info("Post Data: {}".format(data_result))
        response = http.run(
            self.endpoint, data_result, self.headers, self.extra_options
        )
        if self.log_response:
            self.log.info(response.text)
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Invalid parameters")

    def execute_callable(self, context):
        return self.data_fn(**context)

Dont forget to create empty __init__.py files inside plugins and plugins/operators folders.

Upvotes: 2

Tizianoreica
Tizianoreica

Reputation: 2236

I couldn't find a solution. Only way I could do this, like passing information that I send over the --conf to the operator was adding a new PythonOperator that collect the info, and using then XCom on my SimpleHTTPOperator

Code

def generate_data(**kwargs):
    confs = kwargs['dag_run'].conf
    logging.info(confs)
    return {'url': confs["url"], 'fileType': confs["fileType"]}


with DAG(
    dag_id='airflow_http_operator',
    default_args=default_args,
    catchup=False,
    schedule_interval="@once",
    tags=['http']
) as dag:
    generate_dict = PythonOperator(
        task_id='generate_dict',
        python_callable=generate_data,
        provide_context=True
    )
    result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        headers={"Content-Type": "application/json"},
        data="{{ task_instance.xcom_pull(task_ids='generate_dict') |tojson}}",
        log_response=True,
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

Upvotes: 0

Related Questions