QDex
QDex

Reputation: 163

Pass data to jinja template in .sql file from a CustomOperator(BaseOperator)

How can we pass data by using a custom operator task ? Here is my code. The sql file does not take the value of the key table_name. The rendered template suposed to be

SELECT COUNT(*) FROM product;

custom_test_operator.py

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults


class MyTestOperator(BaseOperator):
    template_fields= ("my_file","parameters",)
    
    @apply_defaults
    def __init__(self, my_file, parameters, *args, **kwargs):
        super(MyTestOperator, self).__init__(*args,**kwargs)
        self.my_file= my_file
        self.parameters=parameters
    
    def execute(self, context):
        print("le fichier",self.my_file)
        file_content = open(self.my_file, "r")
        print(file_content.read())

dag_custom_operator.py

from airflow.decorators import task, dag
from datetime import datetime, timedelta
from include.customs_operators.custom_test_operator import MyTestOperator


@dag("custom_operator", start_date=datetime(2024,12,4) ,schedule=timedelta(minutes=2), catchup=False,
     max_active_runs=1, dagrun_timeout=timedelta(minutes=2),
     template_searchpath="/usr/local/airflow/include/sql/"
     )
def the_custom_operator():
     
     @task.bash
     def start():
          return "echo 'today is : {{ ds }}'"
     
     
     process_data = MyTestOperator(
          task_id="process_data",
          my_file="/usr/local/airflow/include/sql/query_1.sql",
          parameters={
               "table_name": "product"
          }
     )
               
     start() >> process_data

the_custom_operator()

query_1.sql

SELECT COUNT(*) FROM {{ parameters.table_name }};

Upvotes: 0

Views: 33

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16079

Your operator is missing template_ext which defines what file extensions (in your case .sql) are to be opened and rendered by the Jinja engine. You can also set template_fields_renderers which will define the style of the attribute in the render page of the UI.

Simply add:

class MyTestOperator(BaseOperator):
    ...
    template_ext = ("sql")
    template_fields_renderers = {"my_file": "sql"}
    ...

Upvotes: 0

Related Questions