Mul
Mul

Reputation: 21

How to run apache flink streaming job continuously on Flink server

Hello, I written code for streaming job where as source and target is a PostgreSQL database. I used JDBCInputFormat/JDBCOutputFormat to read and write the records(Referenced example). Code:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery(JDBCConfig.SELECT_FROM_SOURCE)
            .setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);

    SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
                @Override
                public long extractAscendingTimestamp(Row row) {
                    Date dt = (Date) row.getField(2);
                    return dt.getTime();
                }
            })
            .keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .fold(null, new FoldFunction<Row, Row>(){
                @Override
                public Row fold(Row row1, Row row) throws Exception {
                    return row;
                }
            });

    source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery("insert into tablename(id, name) values (?,?)")
            .setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
            .finish());

This code is executing correctly but not running continuously on Flink server(Select query is executing only once.) Expected to run continuously on flink server.

Upvotes: 2

Views: 1126

Answers (2)

LIU YUE
LIU YUE

Reputation: 2007

solution 1: if you keep using this deprecated dataset api, the secret to make it continously query lies in the query statement, here is a sample:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

public class FlinkPostgresDataSource {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // PostgreSQL connection parameters
        String driverName = "org.postgresql.Driver";
        String dbUrl = "jdbc:postgresql://localhost:5432/mydatabase";
        String username = "username";
        String password = "password";

        // Query parameters
        String query = "SELECT id, name FROM your_table WHERE id >= ? AND id < ?";
        int batchSize = 100; // Number of records per batch
        int startId = 0; // Initial start id

        // Build JDBCInputFormat for PostgreSQL
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername(driverName)
                .setDBUrl(dbUrl)
                .setQuery(query)
                .setUsername(username)
                .setPassword(password)
                .setRowTypeInfo(new RowTypeInfo(org.apache.flink.api.common.typeinfo.Types.INT, org.apache.flink.api.common.typeinfo.Types.STRING))
                .setFetchSize(batchSize)
                .setParametersProvider(new JDBCInputFormat.ParameterProvider() {
                    @Override
                    public Object[][] getParameters() {
                        int endId = startId + batchSize;
                        return new Object[][] { { startId }, { endId } };
                    }
                })
                .finish();

        // Create DataSet from JDBCInputFormat
        env.createInput(jdbcInputFormat)
                .map(new org.apache.flink.api.common.functions.MapFunction<Row, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> map(Row row) throws Exception {
                        return Tuple2.of(row.getField(0), row.getField(1));
                    }
                })
                .print(); // Example: Output to stdout

        // Execute the Flink job
        env.execute("Flink PostgreSQL DataSource");
    }
}

solution 2: use data stream api instead

public class FlinkJdbcExample {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Add JDBC source function
        env.addSource(new JdbcSourceFunction())
           .print();

        // Execute the program
        env.execute("Flink JDBC Example");
    }

    public static class JdbcSourceFunction implements SourceFunction<Row> {
    private volatile boolean isRunning = true;
    private transient Connection connection;
    private transient PreparedStatement preparedStatement;

    @Override
    public void run(SourceContext<Row> ctx) throws Exception {
        connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/your_database", "your_username", "your_password");
        preparedStatement = connection.prepareStatement("SELECT * FROM your_table");
        preparedStatement.setFetchSize(fetchSize); // Set fetchSize

        while (isRunning) {
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                // Convert ResultSet row to Flink Row object
                Row row = Row.of(resultSet.getInt("id"), resultSet.getString("name"));
                ctx.collect(row);
            }
            Thread.sleep(1000); // Example: Sleep for 1 second before querying again
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            // Log or handle the exception
        }
    }
}

From Flink troubleshooting

Upvotes: 0

BrightFlow
BrightFlow

Reputation: 1294

Probably, you have to define your own Flink Source or JDBCInputFormat, since the one you use here will stop the SourceTask while fetching all results from DB. One way to solve this is create your own jdbc input format based on JDBCInputFormat, trying to re-execute the SQL query while reading the last row from DB in nextRecord.

Upvotes: 1

Related Questions