arya.s
arya.s

Reputation: 121

Getting java.lang.RuntimeException:driver class not found when used jdbc_hook more than once in airflow operator

Use case is to run list of sql in hive and update impala metadata. As shown below two methods for hive and impala uses jdbc_hook. In which ever order I call these methods only first one runs and second one throws ERROR - java.lang.RuntimeException: Class <driver name of hive/impala> not found. Each method runs fine when used separately. Please find the execute method of airflow custom operator ::: Note :: I can't use hive_operator to run hive statements. And I don't see any methods in HiveServer2_Hook. Am new to airflow any help is much appreciated

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.jdbc_hook import JdbcHook
import sqlparse

class CustomHiveOperator(BaseOperator):
    """
    Executes hql code and invalidates,compute stats impala for that table.

    Requires JdbcHook,sqlparse.

    :param hive_jdbc_conn: reference to a predefined hive database
    :type hive_jdbc_conn: str
    :param impala_jdbc_conn: reference to a predefined impala database
    :type impala_jdbc_conn: str
    :param table_name: hive table name, used for post process in impala
    :type table_name: str
    :param script_path: hql scirpt path to run in hive
    :type script_path: str
    :param autocommit: if True, each command is automatically committed.
        (default value: False)
    :type autocommit: bool
    :param parameters: (optional) the parameters to render the SQL query with.
    :type parameters: mapping or iterable
    """
    
    
    @apply_defaults
    def __init__(
            self,
            hive_jdbc_conn: str,
            impala_jdbc_conn:str,
            table_name:str,
            script_path:str,
            autocommit=False,
            parameters=None,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.hive_jdbc_conn= hive_jdbc_conn
        self.impala_jdbc_conn= impala_jdbc_conn
        self.table_name=table_name
        self.script_path=script_path
        self.autocommit=autocommit
        self.parameters=parameters

    def execute(self,context):
        self.hive_run()
        self.impala_post_process()
        
    
    def format_string(self,x):
        return x.replace(";","")    
    
    
    def hive_run(self):
        with open(self.script_path) as f:
            data = f.read()
        hql_temp = sqlparse.split((data))
        hql = [self.format_string(x) for x in hql_temp]
        self.log.info('Executing: %s', hql)
        self.hive_hook = JdbcHook(jdbc_conn_id=self.hive_jdbc_conn)
        self.hive_hook.run(hql, self.autocommit, parameters=self.parameters)            
        
    
    
    def impala_post_process(self):
        invalidate = 'INVALIDATE METADATA '+self.table_name
        compute_stats = 'COMPUTE STATS '+self.table_name
        hql = [invalidate,compute_stats]
        self.log.info('Executing: %s', hql)     
        self.impala_hook = JdbcHook(jdbc_conn_id=self.impala_jdbc_conn)
        self.impala_hook.run(hql, self.autocommit, parameters=self.parameters)

Upvotes: 0

Views: 311

Answers (1)

joebeeson
joebeeson

Reputation: 4366

This is actually an issue with how Airflow uses jaydebeapi and the underlying JPype modules to facilitate the JDBC connection.

A Java virtual machine is started when JPype is first used (the first JdbcHook.get_conn call) and the only libraries that the virtual machine is made aware of is the specific one you're using for whichever JDBC connection is being made. When you create another connection the virtual machine is already started and isn't aware of the libraries necessary for a different connection type.

The only way that I have found around this is to use an extension of JdbcHook which overrides the get_conn method to gather the paths of all JDBC drivers that are defined as a Connection object in Airflow. See here for the Airflow implementation.

Upvotes: 1

Related Questions