shachar
shachar

Reputation: 683

Spring reactive db (r2dbc) insert data multithread issue

I'm trying to insert data in parallel ,using spring r2dbc, to psql table
No matter how I run ,either in different connections or using the same connection and running in parallel batches/statements it always revert to a single thread (reactor-tcp-nio-1) and running in serial and not parallel which affect performance

Multiple connections

  @Test
    void testMultipleStatementBatch2() {
        final List<List<Tuple2<Integer, String>>> result = Flux.range(0, 100)
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(index -> Flux.usingWhen(
                        connectionFactory.create(),
                        connection -> {
                            log.debug("Running index:{}", index);
                            return batch(connection);
                        },
                        Connection::close
                ))
                .doOnNext(tuple2s -> log.debug("result:{}", tuple2s))
                .sequential()
                .collectList()
                .block();
        log.debug("list:{}", gson.toJson(result));
    }

 private Mono<List<Tuple2<Integer, String>>> batch(Connection connection) {
        log.debug("start running");
        final Batch batch = connection.createBatch();
        IntStream.range(0, 10).boxed()
                .forEach(index ->
                        batch.add("INSERT INTO my_test(key,value,comment) " +
                                "values (" +
                                "'" + index + "'" +
                                ",'value_" + index + "'" +
                                ",'comment_" + index + "'" +
                                ")" +
                                " RETURNING *"));
        return Flux.from(batch.execute())
                .flatMap(result -> result.map((row, rowMetadata) -> {
                    log.debug("Result values");
                    final Integer id = row.get("key", Integer.class);
                    final String value = row.get("value", String.class);
                    return Tuples.of(id, value);
                }))
                .collectList();
    }
...
20230831 13:33:31.167 DEBUG [Test worker]          QUERY.debug(249) - Executing query: drop table if exists my_test cascade ; create table my_test(key int,value varchar(255),comment varchar(255))
20230831 13:33:31.171 WARN  [reactor-tcp-nio-1]    ReactorNettyClient.warn(289) - Notice: SEVERITY_LOCALIZED=NOTICE, SEVERITY_NON_LOCALIZED=NOTICE, CODE=00000, MESSAGE=table "my_test" does not exist, skipping, FILE=tablecmds.c, LINE=1186, ROUTINE=DropErrorMsgNonExistent
20230831 13:33:32.256 DEBUG [parallel-6]           StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:1
20230831 13:33:32.257 DEBUG [parallel-6]           StatementConnectionTest.batch(96) - start running
20230831 13:33:32.265 DEBUG [parallel-6]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 13:33:32.276 DEBUG [parallel-6]           StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:0
...
20230831 13:33:32.312 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.313 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.314 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.314 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.315 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$testMultipleStatementBatch2$5(81) - result:[[0,value_0], [1,value_1], [2,value_2], [3,value_3], [4,value_4], [5,value_5], [6,value_6], [7,value_7], [8,value_8], [9,value_9]]
20230831 13:33:32.316 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:12
20230831 13:33:32.316 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.batch(96) - start running
20230831 13:33:32.317 DEBUG [reactor-tcp-nio-1]    QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 13:33:32.318 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.319 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.320 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.321 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.322 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.323 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batch$7(109) - Result values

Multiple batches

 @Test
    void runBatchParallelism() {
        var list = Mono.usingWhen(dbService.createConnection(),
                        this::batchParallelism,
                        Connection::close)
                .block();
        log.debug("list:{}", gson.toJson(list));
    }

 private Mono<List<List<Tuple2<Integer, String>>>> batchParallelism(Connection connection) {
        log.debug("start running");
        return Flux.range(0, 5).parallel().runOn(Schedulers.parallel())
                .flatMap(fluxIndex -> {
                    log.debug("flux index:{}", fluxIndex);
                    final Batch batch = connection.createBatch();
                    IntStream.range(0, 10).boxed()
                            .forEach(index ->
                                    batch.add("INSERT INTO my_test(key,value,comment) " +
                                            "values (" +
                                            "'" + index + "'" +
                                            ",'value_" + index + "'" +
                                            ",'comment_" + index + "'" +
                                            ")" +
                                            " RETURNING *"));
                    return Flux.from(batch.execute())
                            .flatMap(result -> result.map((row, rowMetadata) -> {
                                log.debug("Result values");
                                final Integer id = row.get("key", Integer.class);
                                final String value = row.get("value", String.class);
                                return Tuples.of(id, value);
                            }))
                            .collectList();
                }).sequential()
                .collectList();
    }
...
20230831 14:05:08.608 DEBUG [Test worker]          StatementConnectionTest.batchParallelism(100) - start running
20230831 14:05:08.646 DEBUG [parallel-3]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:4
20230831 14:05:08.646 DEBUG [parallel-1]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:2
20230831 14:05:08.646 DEBUG [parallel-2]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:3
20230831 14:05:08.646 DEBUG [parallel-8]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:1
20230831 14:05:08.646 DEBUG [parallel-7]           StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:0
20230831 14:05:08.660 DEBUG [parallel-1]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.673 DEBUG [parallel-8]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.678 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.680 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.681 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.683 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.684 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.685 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.686 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.687 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.688 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.688 DEBUG [parallel-7]           QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.689 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.690 DEBUG [reactor-tcp-nio-1]    QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.691 DEBUG [reactor-tcp-nio-1]    QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.694 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.696 DEBUG [reactor-tcp-nio-1]    StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
...

Upvotes: 1

Views: 154

Answers (0)

Related Questions