LucaD
LucaD

Reputation: 23

Flink's Postgresql jdbc sink reWriteBatchedInserts not working as expected

I'm developing a Flink stream application, one of my sinks is a timescale hypertable (Postgres JdbcSink) and the connection Url specifies reWriteBatchedInserts=true.

Flink version is 1.16.2
Timescale version is 2.11.0
Postgres version is 15.3

This is the sink code (fields are anonymized but all the rest is the same), please notice the call to withBatchSize(1000):

SinkFunction<EnrichedDataTuple> dataSink = JdbcSink.sink(
                "insert into mytable (a, b, c, d, e, f, g) values (?,?,?,?,?,?,?) on conflict do nothing",
                (statement, row) -> { 
                    statement.setInt(1, row.getA());
                    statement.setInt(2, row.getB());
                    statement.setInt(3, row.getC());
                    statement.setInt(4, row.getD());
                    statement.setInt(5, row.getE());
                    statement.setInt(6, row.getF());
                    statement.setInt(7, row.getG());
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(appConfiguration.getString(ConfigOptions.key("timescale.uri").stringType().noDefaultValue()))
                        .withUsername(appConfiguration.getString(ConfigOptions.key("timescale.user").stringType().noDefaultValue()))
                        .withPassword(appConfiguration.getString(ConfigOptions.key("timescale.password").stringType().noDefaultValue()))
                        .withDriverName("org.postgresql.Driver")
                        .build()
                );

I expect that the actual insert queries should process up to 1000 rows at once, however every query inserts no more than 128 rows.

I enabled the postgres 'all' query logging to check the behavior and, as stated before, specifying withBatchSize(N) with value of N<=128 works as expected while using N>128 has the same effect of setting N=128. Looks like there is an hard cap to 128, but as far as I know, the limit should be 1000 rows and 32767 bind parameters at once.

All queries look like this:

insert into public.mytable (a, b, c, d, e, f, g) values ($1,$2,$3,$4,$5,$6,$7),  
($8,$9,$10,$11,$12,$13,$14),($15,$16,$17,$18,$19,$20,$21),  
($22,$23,$24,$25,$26,$27,$28),($29,$30,$31,$32,$33,$34,$35),  
...  
...  
...
($883,$884,$885,$886,$887,$888,$889),($890,$891,$892,$893,$894,$895,$896) on conflict do nothing

128 rows and 896 (128x7) bind parameters.
Are there other postgres parameters to tune or jdbc options to set?

Thank you

Edit: I found the "problem", inspired by Why is Hibernate splitting my batch insert into 3 queries:

@Override
  protected void transformQueriesAndParameters() throws SQLException {
    ArrayList<@Nullable ParameterList> batchParameters = this.batchParameters;
    if (batchParameters == null || batchParameters.size() <= 1
        || !(preparedQuery.query instanceof BatchedQuery)) {
      return;
    }
    BatchedQuery originalQuery = (BatchedQuery) preparedQuery.query;
    // Single query cannot have more than {@link Short#MAX_VALUE} binds, thus
    // the number of multi-values blocks should be capped.
    // Typically, it does not make much sense to batch more than 128 rows: performance
    // does not improve much after updating 128 statements with 1 multi-valued one, thus
    // we cap maximum batch size and split there.
    final int bindCount = originalQuery.getBindCount();
    final int highestBlockCount = 128;
    final int maxValueBlocks = bindCount == 0 ? 1024 /* if no binds, use 1024 rows */
        : Integer.highestOneBit( // deriveForMultiBatch supports powers of two only
            Math.min(Math.max(1, maximumNumberOfParameters() / bindCount), highestBlockCount));

in https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/jdbc/PgPreparedStatement.java

Too bad that limit of 128 is hardcoded, really not cool! I wonder if rebuilding the driver with a greater limit would make sense...

Upvotes: 0

Views: 369

Answers (0)

Related Questions