Reputation: 305
I am using Flink Jdbc Sink to push data into Postgres tables. The data has to be stored in different schemas having the same database connection.
DataStream<Book> stream = env.fromSource(...);
Each Book record in the stream has details about the schema it has to be stored in.
I tried to parameterize the database schema name in the PreparedStatement, but as expected, that is not allowed by SQL.
stream.addSink(JdbcSink.sink(
"insert into ?.books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setString(1, book.schema)
statement.setLong(2, book.id);
statement.setString(3, book.title);
statement.setString(4, book.authors);
statement.setInt(5, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));
Is there a workaround for this? Or do I have to explicitly add each schema in a separate sink?
Upvotes: 1
Views: 1029
Reputation: 305
Let's say that we have an array of Strings where all the database schemas are listed (can be pre-configured via application properties).
We can filter the stream based on the schema name and create an individual jdbc sink for each schema.
String[] schemas = {"schema-1", "schema-2", "schema-3"};
for(int i=0; i<schemas.length; i++){
stream.filter(x -> x.schema.equals(schemas[i])).addSink(JdbcSink.sink(
"insert into " + schemas[i] ".books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));
}
Upvotes: 2