Reputation: 163
How can we pass data by using a custom operator task ? Here is my code. The sql file does not take the value of the key table_name. The rendered template suposed to be
SELECT COUNT(*) FROM product;
custom_test_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyTestOperator(BaseOperator):
template_fields= ("my_file","parameters",)
@apply_defaults
def __init__(self, my_file, parameters, *args, **kwargs):
super(MyTestOperator, self).__init__(*args,**kwargs)
self.my_file= my_file
self.parameters=parameters
def execute(self, context):
print("le fichier",self.my_file)
file_content = open(self.my_file, "r")
print(file_content.read())
dag_custom_operator.py
from airflow.decorators import task, dag
from datetime import datetime, timedelta
from include.customs_operators.custom_test_operator import MyTestOperator
@dag("custom_operator", start_date=datetime(2024,12,4) ,schedule=timedelta(minutes=2), catchup=False,
max_active_runs=1, dagrun_timeout=timedelta(minutes=2),
template_searchpath="/usr/local/airflow/include/sql/"
)
def the_custom_operator():
@task.bash
def start():
return "echo 'today is : {{ ds }}'"
process_data = MyTestOperator(
task_id="process_data",
my_file="/usr/local/airflow/include/sql/query_1.sql",
parameters={
"table_name": "product"
}
)
start() >> process_data
the_custom_operator()
query_1.sql
SELECT COUNT(*) FROM {{ parameters.table_name }};
Upvotes: 0
Views: 33
Reputation: 16079
Your operator is missing template_ext
which defines what file extensions (in your case .sql
) are to be opened and rendered by the Jinja engine. You can also set template_fields_renderers
which will define the style of the attribute in the render page of the UI.
Simply add:
class MyTestOperator(BaseOperator):
...
template_ext = ("sql")
template_fields_renderers = {"my_file": "sql"}
...
Upvotes: 0