Shweta
Shweta

Reputation: 1

Apache Flink JDBC InputFormat throwing java.net.SocketException: Socket closed

I am querying oracle database using Flink DataSet API. For this I have customised Flink JDBCInputFormat to return java.sql.Resultset. As I need to perform further operation on resultset using Flink operators.

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(1);
    @SuppressWarnings("unchecked")
    DataSource<ResultSet> source
            = environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
                    .setUsername("username")
                    .setPassword("password")
                    .setDrivername("driver_name")
                    .setDBUrl("jdbcUrl")
                    .setQuery("query")
                    .finish(),      
                    new GenericTypeInfo<ResultSet>(ResultSet.class)
            );
    source.print();

    environment.execute();

} 

Following is the customised JDBCInputFormat:

public class JDBCInputFormat extends RichInputFormat<ResultSet, InputSplit> implements ResultTypeQueryable {

@Override
public void open(InputSplit inputSplit) throws IOException {
                Class.forName(drivername);
                    dbConn = DriverManager.getConnection(dbURL, username, password);
                statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
                resultSet = statement.executeQuery();
}

@Override
public void close() throws IOException {
            if(statement != null) {
                    statement.close();
                }
                if(resultSet != null) 
                    resultSet.close();
                if(dbConn != null) {
                    dbConn.close();
                }
}

@Override
public boolean reachedEnd() throws IOException {
        isLastRecord = resultSet.isLast();
    return isLastRecord;
}

@Override
public ResultSet nextRecord(ResultSet row) throws IOException{  
        if(!isLastRecord){              
            resultSet.next();
        }
        return resultSet;
}

}

This works with below query having limit in the row fetched: SELECT a,b,c from xyz where rownum <= 10; but when I try to fetch all the rows having approx 1 million of data, I am getting the below exception after fetching random number of rows:

java.sql.SQLRecoverableException: Io exception: Socket closed
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:101)
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:521)
at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1024)
at oracle.jdbc.driver.OracleResultSetImpl.close_or_fetch_from_next(OracleResultSetImpl.java:314)
at oracle.jdbc.driver.OracleResultSetImpl.next(OracleResultSetImpl.java:228)
at oracle.jdbc.driver.ScrollableResultSet.cacheRowAt(ScrollableResultSet.java:1839)
at oracle.jdbc.driver.ScrollableResultSet.isValidRow(ScrollableResultSet.java:1823)
at oracle.jdbc.driver.ScrollableResultSet.isLast(ScrollableResultSet.java:349)
at JDBCInputFormat.reachedEnd(JDBCInputFormat.java:98)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.SocketException: Socket closed at java.net.SocketOutputStream.socketWrite0(Native Method)

So for my case, how i can solve this issue?

Upvotes: 0

Views: 801

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

I don't think it is possible to ship a ResultSet like a regular record. This is a stateful object that internally maintains a connection to the database server. Using a ResultSet as a record that is transferred between Flink operators means that it can be serialized, shipped over the via the network to another machine, deserialized, and handed to a different thread in a different JVM process. That does not work.

Depending on the connection a ResultSet might as well stay on the same machine in the same thread, which might be the case that worked for you. If you want to query a database from within an operator, you could implement the function as a RichMapPartitionFunction. Otherwise, I'd read the ResultSet in the data source and forward the resulting rows.

Upvotes: 1

Related Questions