user3138594
user3138594

Reputation: 219

Passing template variables to HiveOperator

I have a jinja template which I plan to use for dynamic SQL generation in Hive. My template look like as follows:

USE {{ db }};

CREATE EXTERNAL TABLE IF NOT EXISTS foo (
    A int,
    B int
)
stored as parquet
location ‘….’;

"db" is something that can be derived by making a function call. I decided to write an operator extending HiveExecOperator. In my environment the class hierarchy is:

BaseOperator <—— BaseExecOperator <— HiveExecOperator

My TestHive operator looks like following:

class TestHive(HiveExecOperator):
    def pre_execute(self, context):
        context[‘db’] = func1(…,,)
        return context['ti'].render_templates()

This one is not working as {{ db }} inside the template doesn't get anything and the hive statement fails. I also tried overriding render_template in TestHive as follows:

class TestHive(HiveExecOperator):
    def render_template(self, attr, content, context):
    context['db'] = func1(..,)
    return super(TestHive, self).render_templates(attr, content, context)

This one fails as the parent class of TestHive doesn't have render_templates method.

Method: render_templates" is only defined in BaseOperator.

Any help is appreciated.

Upvotes: 1

Views: 1546

Answers (1)

Mendhak
Mendhak

Reputation: 8785

Assuming you mean HiveOperator and not HiveExecOperator, and having a look at what you're describing, I don't believe you should need to derive any kind of operator here. Unless there's some extra missing info which I'm not seeing, you're simply asking how to pass the value of a function call as a parameter into a templated command.

The hql argument of HiveOperator is a template field. That means you should be able to simply define your template as you've done already and then provide the value to it as part of that Operator call. But remember to prefix the variable being passed in with params. See:

my_query= """
    USE {{ params.db }};

    CREATE EXTERNAL TABLE IF NOT EXISTS foo (
    A int,
    B int
    )
    stored as parquet
    location .......
    """

run_hive_query = HiveOperator(
    task_id="my_task",
    hql=my_query,
    params={ 'db': func1(...) },
    dag=dag
)

Upvotes: 3

Related Questions