Reputation: 683
I'm trying to insert data in parallel ,using spring r2dbc, to psql table
No matter how I run ,either in different connections or using the same connection and running in parallel batches/statements
it always revert to a single thread (reactor-tcp-nio-1) and running in serial and not parallel which affect performance
Multiple connections
@Test
void testMultipleStatementBatch2() {
final List<List<Tuple2<Integer, String>>> result = Flux.range(0, 100)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(index -> Flux.usingWhen(
connectionFactory.create(),
connection -> {
log.debug("Running index:{}", index);
return batch(connection);
},
Connection::close
))
.doOnNext(tuple2s -> log.debug("result:{}", tuple2s))
.sequential()
.collectList()
.block();
log.debug("list:{}", gson.toJson(result));
}
private Mono<List<Tuple2<Integer, String>>> batch(Connection connection) {
log.debug("start running");
final Batch batch = connection.createBatch();
IntStream.range(0, 10).boxed()
.forEach(index ->
batch.add("INSERT INTO my_test(key,value,comment) " +
"values (" +
"'" + index + "'" +
",'value_" + index + "'" +
",'comment_" + index + "'" +
")" +
" RETURNING *"));
return Flux.from(batch.execute())
.flatMap(result -> result.map((row, rowMetadata) -> {
log.debug("Result values");
final Integer id = row.get("key", Integer.class);
final String value = row.get("value", String.class);
return Tuples.of(id, value);
}))
.collectList();
}
...
20230831 13:33:31.167 DEBUG [Test worker] QUERY.debug(249) - Executing query: drop table if exists my_test cascade ; create table my_test(key int,value varchar(255),comment varchar(255))
20230831 13:33:31.171 WARN [reactor-tcp-nio-1] ReactorNettyClient.warn(289) - Notice: SEVERITY_LOCALIZED=NOTICE, SEVERITY_NON_LOCALIZED=NOTICE, CODE=00000, MESSAGE=table "my_test" does not exist, skipping, FILE=tablecmds.c, LINE=1186, ROUTINE=DropErrorMsgNonExistent
20230831 13:33:32.256 DEBUG [parallel-6] StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:1
20230831 13:33:32.257 DEBUG [parallel-6] StatementConnectionTest.batch(96) - start running
20230831 13:33:32.265 DEBUG [parallel-6] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 13:33:32.276 DEBUG [parallel-6] StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:0
...
20230831 13:33:32.312 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.313 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.314 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.314 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.315 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$testMultipleStatementBatch2$5(81) - result:[[0,value_0], [1,value_1], [2,value_2], [3,value_3], [4,value_4], [5,value_5], [6,value_6], [7,value_7], [8,value_8], [9,value_9]]
20230831 13:33:32.316 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$testMultipleStatementBatch2$3(76) - Running index:12
20230831 13:33:32.316 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.batch(96) - start running
20230831 13:33:32.317 DEBUG [reactor-tcp-nio-1] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 13:33:32.318 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.319 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.320 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.321 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.322 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
20230831 13:33:32.323 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batch$7(109) - Result values
Multiple batches
@Test
void runBatchParallelism() {
var list = Mono.usingWhen(dbService.createConnection(),
this::batchParallelism,
Connection::close)
.block();
log.debug("list:{}", gson.toJson(list));
}
private Mono<List<List<Tuple2<Integer, String>>>> batchParallelism(Connection connection) {
log.debug("start running");
return Flux.range(0, 5).parallel().runOn(Schedulers.parallel())
.flatMap(fluxIndex -> {
log.debug("flux index:{}", fluxIndex);
final Batch batch = connection.createBatch();
IntStream.range(0, 10).boxed()
.forEach(index ->
batch.add("INSERT INTO my_test(key,value,comment) " +
"values (" +
"'" + index + "'" +
",'value_" + index + "'" +
",'comment_" + index + "'" +
")" +
" RETURNING *"));
return Flux.from(batch.execute())
.flatMap(result -> result.map((row, rowMetadata) -> {
log.debug("Result values");
final Integer id = row.get("key", Integer.class);
final String value = row.get("value", String.class);
return Tuples.of(id, value);
}))
.collectList();
}).sequential()
.collectList();
}
...
20230831 14:05:08.608 DEBUG [Test worker] StatementConnectionTest.batchParallelism(100) - start running
20230831 14:05:08.646 DEBUG [parallel-3] StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:4
20230831 14:05:08.646 DEBUG [parallel-1] StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:2
20230831 14:05:08.646 DEBUG [parallel-2] StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:3
20230831 14:05:08.646 DEBUG [parallel-8] StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:1
20230831 14:05:08.646 DEBUG [parallel-7] StatementConnectionTest.lambda$batchParallelism$9(103) - flux index:0
20230831 14:05:08.660 DEBUG [parallel-1] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.673 DEBUG [parallel-8] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.678 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.680 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.681 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.683 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.684 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.685 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.686 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.687 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.688 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.688 DEBUG [parallel-7] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.689 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.690 DEBUG [reactor-tcp-nio-1] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.691 DEBUG [reactor-tcp-nio-1] QUERY.debug(249) - Executing query: INSERT INTO my_test(key,value,comment) values ('0','value_0','comment_0') RETURNING *; INSERT INTO my_test(key,value,comment) values ('1','value_1','comment_1') RETURNING *; INSERT INTO my_test(key,value,comment) values ('2','value_2','comment_2') RETURNING *; INSERT INTO my_test(key,value,comment) values ('3','value_3','comment_3') RETURNING *; INSERT INTO my_test(key,value,comment) values ('4','value_4','comment_4') RETURNING *; INSERT INTO my_test(key,value,comment) values ('5','value_5','comment_5') RETURNING *; INSERT INTO my_test(key,value,comment) values ('6','value_6','comment_6') RETURNING *; INSERT INTO my_test(key,value,comment) values ('7','value_7','comment_7') RETURNING *; INSERT INTO my_test(key,value,comment) values ('8','value_8','comment_8') RETURNING *; INSERT INTO my_test(key,value,comment) values ('9','value_9','comment_9') RETURNING *
20230831 14:05:08.694 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
20230831 14:05:08.696 DEBUG [reactor-tcp-nio-1] StatementConnectionTest.lambda$batchParallelism$7(116) - Result values
...
Upvotes: 1
Views: 154