Reputation: 21
Hello, I written code for streaming job where as source and target is a PostgreSQL database. I used JDBCInputFormat/JDBCOutputFormat to read and write the records(Referenced example). Code:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery(JDBCConfig.SELECT_FROM_SOURCE)
.setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);
SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
@Override
public long extractAscendingTimestamp(Row row) {
Date dt = (Date) row.getField(2);
return dt.getTime();
}
})
.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(null, new FoldFunction<Row, Row>(){
@Override
public Row fold(Row row1, Row row) throws Exception {
return row;
}
});
source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery("insert into tablename(id, name) values (?,?)")
.setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
.finish());
This code is executing correctly but not running continuously on Flink server(Select query is executing only once.) Expected to run continuously on flink server.
Upvotes: 2
Views: 1126
Reputation: 2007
solution 1: if you keep using this deprecated dataset api, the secret to make it continously query lies in the query statement, here is a sample:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class FlinkPostgresDataSource {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// PostgreSQL connection parameters
String driverName = "org.postgresql.Driver";
String dbUrl = "jdbc:postgresql://localhost:5432/mydatabase";
String username = "username";
String password = "password";
// Query parameters
String query = "SELECT id, name FROM your_table WHERE id >= ? AND id < ?";
int batchSize = 100; // Number of records per batch
int startId = 0; // Initial start id
// Build JDBCInputFormat for PostgreSQL
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driverName)
.setDBUrl(dbUrl)
.setQuery(query)
.setUsername(username)
.setPassword(password)
.setRowTypeInfo(new RowTypeInfo(org.apache.flink.api.common.typeinfo.Types.INT, org.apache.flink.api.common.typeinfo.Types.STRING))
.setFetchSize(batchSize)
.setParametersProvider(new JDBCInputFormat.ParameterProvider() {
@Override
public Object[][] getParameters() {
int endId = startId + batchSize;
return new Object[][] { { startId }, { endId } };
}
})
.finish();
// Create DataSet from JDBCInputFormat
env.createInput(jdbcInputFormat)
.map(new org.apache.flink.api.common.functions.MapFunction<Row, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Row row) throws Exception {
return Tuple2.of(row.getField(0), row.getField(1));
}
})
.print(); // Example: Output to stdout
// Execute the Flink job
env.execute("Flink PostgreSQL DataSource");
}
}
solution 2: use data stream api instead
public class FlinkJdbcExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Add JDBC source function
env.addSource(new JdbcSourceFunction())
.print();
// Execute the program
env.execute("Flink JDBC Example");
}
public static class JdbcSourceFunction implements SourceFunction<Row> {
private volatile boolean isRunning = true;
private transient Connection connection;
private transient PreparedStatement preparedStatement;
@Override
public void run(SourceContext<Row> ctx) throws Exception {
connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/your_database", "your_username", "your_password");
preparedStatement = connection.prepareStatement("SELECT * FROM your_table");
preparedStatement.setFetchSize(fetchSize); // Set fetchSize
while (isRunning) {
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
// Convert ResultSet row to Flink Row object
Row row = Row.of(resultSet.getInt("id"), resultSet.getString("name"));
ctx.collect(row);
}
Thread.sleep(1000); // Example: Sleep for 1 second before querying again
}
}
@Override
public void cancel() {
isRunning = false;
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// Log or handle the exception
}
}
}
Upvotes: 0
Reputation: 1294
Probably, you have to define your own Flink Source or JDBCInputFormat, since the one you use here will stop the SourceTask while fetching all results from DB. One way to solve this is create your own jdbc input format based on JDBCInputFormat
, trying to re-execute the SQL query while reading the last row from DB in nextRecord
.
Upvotes: 1