MamtaJ
MamtaJ

Reputation: 1

Hive query executeQuery() hangs in java JDBC code

I have created a UDTF and I'm running below java hive JDBC code inside it to execute a hive query and get results. I'm able to get the connection to the hive2 server successfully but the code hangs indefinitely without any exception at statement.executeQuery(). What could be the reason? The same code runs in a standalone eclipse class but hands when deployed in a hadoop cluster as udtf.


public class DynamicWhereUDTF extends GenericUDTF {
    private PrimitiveObjectInspector stringOI = null;
    ArrayList<Object[]> results = new ArrayList<Object[]>();

    @Override
    public StructObjectInspector initialize(ObjectInspector[] args)
            throws UDFArgumentException {

        stringOI = (PrimitiveObjectInspector) args[0];
        if (stringOI != null) {
            String name = stringOI.toString();
            System.out.println("param <-------> " + name);
        }

        List<String> fieldNames = new ArrayList<String>();
        try {
            fieldNames = getColumnNames("d_drug");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        System.out.println("fieldNames size ---> " + fieldNames.size());
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        for (int i = 0; i < fieldNames.size(); i++) {
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        }

        System.out.println("----------ObjectInspectorFactory created------------ ");
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement statement = null;
        try {
            System.out.println("Processing records 1");
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            System.out.println("Processing records 2");
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "Kerberos");
            conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
            conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab("[email protected]", "/tmp/abc.keytab");
            System.out.println("Processing records 3");
            String hiveJdbcUrl = "jdbc:hive2://<host>:10000/demo_db;principal=hive/<host>@CS.MSD";
            conn = DriverManager.getConnection(hiveJdbcUrl, "abc", "");
            System.out.println("conn1 <-------> " + conn);
            statement = conn.prepareStatement("select * from xyz limit 5");
            System.out.println(" statement ----------> " + statement);
            rs = statement.executeQuery();
            System.out.println(" resultset ----------> " + rs);
            ResultSetMetaData rsMetaData = rs.getMetaData();
            int columnCount = rsMetaData.getColumnCount();
            System.out.println("columnCount ---> " + columnCount);
            // ArrayList<Object[]> results = new ArrayList<Object[]>();
            StringBuilder values = new StringBuilder();

            while (rs.next()) {
                values = new StringBuilder();
                for (int i = 0; i < columnCount; i++) {
                    values = values.append(rs.getString(i + 1)).append(",");
                }
                String output = values.toString().substring(0,
                        values.lastIndexOf(","));
                System.out.println("output  -----> " + output);
                results.add(new Object[] {"122556", "52905"});
            }
            System.out.println("------- results forwarded -------");

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (conn != null)
                try {
                    conn.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(
                fieldNames, fieldOIs);
    }

    @Override
    public void close() throws HiveException {
        // TODO Auto-generated method stub

    }

    @Override
    public void process(Object[] record) throws HiveException {

        try {
            Iterator<Object[]> it = results.iterator();
            while (it.hasNext()) {
                Object[] r = it.next();
                forward(r);
            }
            System.out.println("------- results forwarded -------");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public List<String> getColumnNames(String tableName) throws SQLException {
        List<String> fieldNames = new ArrayList<String>();
        fieldNames.add("drug_id");
        fieldNames.add("drug_cd");
        return fieldNames;
    }
}

Upvotes: 0

Views: 600

Answers (1)

serge_k
serge_k

Reputation: 1772

The problem might be in creating connection in initialize method. Try to create connection in configure method, you can check Hbase connector as an example.

Upvotes: 0

Related Questions