Reputation: 121
Use case is to run list of sql in hive and update impala metadata. As shown below two methods for hive and impala uses jdbc_hook. In which ever order I call these methods only first one runs and second one throws ERROR - java.lang.RuntimeException: Class <driver name of hive/impala> not found. Each method runs fine when used separately. Please find the execute method of airflow custom operator ::: Note :: I can't use hive_operator to run hive statements. And I don't see any methods in HiveServer2_Hook. Am new to airflow any help is much appreciated
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.jdbc_hook import JdbcHook
import sqlparse
class CustomHiveOperator(BaseOperator):
"""
Executes hql code and invalidates,compute stats impala for that table.
Requires JdbcHook,sqlparse.
:param hive_jdbc_conn: reference to a predefined hive database
:type hive_jdbc_conn: str
:param impala_jdbc_conn: reference to a predefined impala database
:type impala_jdbc_conn: str
:param table_name: hive table name, used for post process in impala
:type table_name: str
:param script_path: hql scirpt path to run in hive
:type script_path: str
:param autocommit: if True, each command is automatically committed.
(default value: False)
:type autocommit: bool
:param parameters: (optional) the parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
@apply_defaults
def __init__(
self,
hive_jdbc_conn: str,
impala_jdbc_conn:str,
table_name:str,
script_path:str,
autocommit=False,
parameters=None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.hive_jdbc_conn= hive_jdbc_conn
self.impala_jdbc_conn= impala_jdbc_conn
self.table_name=table_name
self.script_path=script_path
self.autocommit=autocommit
self.parameters=parameters
def execute(self,context):
self.hive_run()
self.impala_post_process()
def format_string(self,x):
return x.replace(";","")
def hive_run(self):
with open(self.script_path) as f:
data = f.read()
hql_temp = sqlparse.split((data))
hql = [self.format_string(x) for x in hql_temp]
self.log.info('Executing: %s', hql)
self.hive_hook = JdbcHook(jdbc_conn_id=self.hive_jdbc_conn)
self.hive_hook.run(hql, self.autocommit, parameters=self.parameters)
def impala_post_process(self):
invalidate = 'INVALIDATE METADATA '+self.table_name
compute_stats = 'COMPUTE STATS '+self.table_name
hql = [invalidate,compute_stats]
self.log.info('Executing: %s', hql)
self.impala_hook = JdbcHook(jdbc_conn_id=self.impala_jdbc_conn)
self.impala_hook.run(hql, self.autocommit, parameters=self.parameters)
Upvotes: 0
Views: 311
Reputation: 4366
This is actually an issue with how Airflow uses jaydebeapi
and the underlying JPype
modules to facilitate the JDBC connection.
A Java virtual machine is started when JPype
is first used (the first JdbcHook.get_conn
call) and the only libraries that the virtual machine is made aware of is the specific one you're using for whichever JDBC connection is being made. When you create another connection the virtual machine is already started and isn't aware of the libraries necessary for a different connection type.
The only way that I have found around this is to use an extension of JdbcHook
which overrides the get_conn
method to gather the paths of all JDBC drivers that are defined as a Connection
object in Airflow. See here for the Airflow implementation.
Upvotes: 1