Reputation: 11
I am trying to batch-persist a Flux-stream of articles into a Postgres r2dbc
The application has two identical article tables (article_origin and article_destiny) The origin table has a million article which I try to batch-insert into the empty article_destiny table! A web-client calls a article-endpoint and requests a single Flux stream. The application works without the batch-insert, but takes more than 20 minutes to persist all the articles one by one.
The articles arrive at the articleDestinyBatchStream and the buffer (with the back-pressure as argument) produces the batch-size chunks.
But the .map(connection -> { is skipped for some reason I do not understand!
Please help
Next find the relevant code snippets:
The web-client
public void getArticleBatchStreamToOrigin(Long backPressure) {
try {
.uri("/article-batch-stream-from-origin/" + backPressure)
.subscribe(articles -> {
streamService.articleDestinyBatchStream(articles);"at client article batch size={} ", articles.size());
} catch (WebClientResponseException wcre) {
log.error("Error Response Code is {} and Response Body is {}", wcre.getRawStatusCode(), wcre.getResponseBodyAsString());
log.error("Exception in method getArticleStream()", wcre);
throw wcre;
} catch (Exception ex) {
log.error("Exception in method getArticleStream()", ex);
throw ex;
The article streaming service (one million articles) called by the client
public Flux<ArticleOrigin> getAllArticleStreamFromOrigin(Long backPressure) {
return articleOriginRepository
.doOnNext(a -> log.debug("at stream service article id={}", a.getId()));
The article batch persist:
public Mono<Void> articleDestinyBatchStream(List<ArticleDestiny> articles) {"connectionFactory {}", connectionFactory.getMetadata().getName());
return Mono.from(connectionFactory.create())
.map(connection -> {
Batch batch = connection.createBatch();
for (ArticleDestiny article :articles) {
batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) " +
"values ('${article.countryCode}','${article.artNumber}', '${}', '${article.price}')");
return batch.execute();
here find config details
@SpringBootApplication(exclude = { R2dbcDataAutoConfiguration.class, R2dbcAutoConfiguration.class })
and next the config class that extends the AbstractR2dbcConfiguration
public ConnectionFactoryInitializer initialize(ConnectionFactory connectionFactory) {
var initializer = new ConnectionFactoryInitializer();
CompositeDatabasePopulator populator = new CompositeDatabasePopulator();
new ResourceDatabasePopulator(new ClassPathResource("/db-config/schema.sql"))
return initializer;
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
another attempt to create the batches
public Mono<Void> articleDestinyBatchStream(List<ArticleDestiny> articles) {
return Mono.from(connectionFactory.create())
.flatMap(c -> {
Batch batch = c.createBatch();
for (ArticleDestiny article :articles) {
batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) " +
"values ('${article.countryCode}','${article.artNumber}', '${}', '${article.price}')");
return Mono.empty();
Upvotes: 0
Views: 1040
Reputation: 11
The solution I found works on a streaming article flux but a single connection:
find next the details
first the router:
public RouterFunction<ServerResponse> root(ArticleHandler articleHandler) {
return RouterFunctions.route()
.GET("/article-stream-to-origin", RequestPredicates.accept(MediaType.valueOf(MediaType.APPLICATION_NDJSON_VALUE)), articleHandler::getArticleStreamToOrigen)
.GET("/article-stream-from-origin/{backPressure}", RequestPredicates.accept(MediaType.valueOf(MediaType.APPLICATION_NDJSON_VALUE)), articleHandler::getArticleStreamFromOrigin)
.GET("/article-batch-stream-from-origin/{backPressure}", RequestPredicates.accept(MediaType.valueOf(MediaType.APPLICATION_NDJSON_VALUE)), articleHandler::getArticleStreamFromOrigin)
.GET("/articles", RequestPredicates.accept(MediaType.APPLICATION_JSON), articleHandler::getAllArticles)
The handler
public Mono<ServerResponse> getAllArticles(ServerRequest request) {
var all = articleService.getAllArticles("id"));
return ok().body(fromPublisher(all, ArticleDestiny.class));
public Mono<ServerResponse> getArticleStreamFromOrigin(ServerRequest request) {
long backPressure = Long.valueOf(request.pathVariable("backPressure"));
Flux<ArticleOrigin> all = streamService.getAllArticleStreamFromOrigin(backPressure);
.body(fromPublisher(all, ArticleOrigin.class));
The client that calls the all-articles service
public Mono<Void> getArticleBatchStreamToOrigin(Long backPressure) {
Flux<ArticleDestiny> articleFlux = client.get()
.uri("/article-batch-stream-from-origin/" + backPressure)
streamService.articleDestinyBatchStream(articleFlux, backPressure);
return Mono.empty();
and finally the service with a single (blocking) connection
public Mono<Void> articleDestinyBatchStream(Flux<ArticleDestiny> articles, Long backPressure) {
Connection con = Mono.from(Mono.from(connectionFactory.create())).block();
.bufferTimeout(backPressure.intValue(), Duration.ofMillis(1000l))
.doOnNext(a ->"article batch size: " + a.size()))
.flatMap(a -> {"inside " + a.size());
Batch batch = con.createBatch();
a.forEach(article -> {
String sql = String.format("INSERT INTO article_destiny (country_code,art_number, name, price) VALUES ('%s','%s','%s','%s')",
log.debug("print each batch: " + batch);
return Mono.from(batch.execute()).doFinally((st -> con.close()));
return Mono.empty();
any comments to solve this blocking db connection?
Upvotes: 0
Reputation: 2806
It is skipped due to no subscribers after reactive chain is completed.
Try below code snippet.
public void articleDestinyBatchStream(List<String> articles) {"connectionFactory {}", connectionFactory.getMetadata().getName());
.flatMapMany(connection -> Flux.from(connection.createBatch()
.add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('DE', '123', 'some name', '23.23')")
.add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('BE', '456', 'some other name', '44.44')")
.add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('NL', '789', 'some new name', '55.55')")
statement throws away whatever the return
value is and just signals to the calling client when everything is done.
Upvotes: 0