artsgard
artsgard

Reputation: 11

Postgres r2dbc batch insert problems

I am trying to batch-persist a Flux-stream of articles into a Postgres r2dbc

The application has two identical article tables (article_origin and article_destiny) The origin table has a million article which I try to batch-insert into the empty article_destiny table! A web-client calls a article-endpoint and requests a single Flux stream. The application works without the batch-insert, but takes more than 20 minutes to persist all the articles one by one.

The articles arrive at the articleDestinyBatchStream and the buffer (with the back-pressure as argument) produces the batch-size chunks.

But the .map(connection -> { is skipped for some reason I do not understand!

Please help

Next find the relevant code snippets:

The web-client

public void getArticleBatchStreamToOrigin(Long backPressure) {
try {
    client.get()
            .uri("/article-batch-stream-from-origin/" + backPressure)
            .accept(MediaType.TEXT_EVENT_STREAM) //APPLICATION_NDJSON TEXT_EVENT_STREAM
            .retrieve()
            .bodyToFlux(ArticleDestiny.class)
            .buffer(backPressure.intValue())
            .subscribe(articles -> {
                streamService.articleDestinyBatchStream(articles);
                log.info("at client article batch size={} ", articles.size());
            });
} catch (WebClientResponseException wcre) {
    log.error("Error Response Code is {} and Response Body is {}", wcre.getRawStatusCode(), wcre.getResponseBodyAsString());
    log.error("Exception in method getArticleStream()", wcre);
    throw wcre;
} catch (Exception ex) {
    log.error("Exception in method getArticleStream()", ex);
    throw ex;
}

}

The article streaming service (one million articles) called by the client

public Flux<ArticleOrigin> getAllArticleStreamFromOrigin(Long backPressure) {
return articleOriginRepository
        .findAll()
        .doOnNext(a -> log.debug("at stream service article id={}", a.getId()));

}

The article batch persist:

  public Mono<Void> articleDestinyBatchStream(List<ArticleDestiny> articles) {
    log.info("connectionFactory {}", connectionFactory.getMetadata().getName());

    return Mono.from(connectionFactory.create())
            .map(connection -> {
                Batch batch = connection.createBatch();

                for (ArticleDestiny article :articles) {
                    batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) " +
                            "values ('${article.countryCode}','${article.artNumber}', '${article.name}', '${article.price}')");
                }

               return batch.execute();
            }).then();

}

here find config details

@SpringBootApplication(exclude = { R2dbcDataAutoConfiguration.class, R2dbcAutoConfiguration.class })

and next the config class that extends the AbstractR2dbcConfiguration

@Bean
public ConnectionFactoryInitializer initialize(ConnectionFactory connectionFactory) {
    var initializer = new ConnectionFactoryInitializer();
    initializer.setConnectionFactory(connectionFactory());
    CompositeDatabasePopulator populator = new CompositeDatabasePopulator();
    populator.addPopulators(
            new ResourceDatabasePopulator(new ClassPathResource("/db-config/schema.sql"))
    );
    initializer.setDatabasePopulator(populator);
    return initializer;
}

@Override
@Primary
@Bean
public ConnectionFactory connectionFactory() {

    return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
            .host("localhost")
            .port(5677)
            .username("admin")
            .password("secret")
            .database("article_flux_db")
            .build());
}

another attempt to create the batches

    public Mono<Void> articleDestinyBatchStream(List<ArticleDestiny> articles) {

    return Mono.from(connectionFactory.create())
            .flatMap(c -> {
                Batch batch = c.createBatch();
                for (ArticleDestiny article :articles) {
                    batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) " +
                            "values ('${article.countryCode}','${article.artNumber}', '${article.name}', '${article.price}')");
                }
                return Mono.empty();
            }).then();
}

Upvotes: 0

Views: 1040

Answers (2)

artsgard
artsgard

Reputation: 11

The solution I found works on a streaming article flux but a single connection:

find next the details

first the router:

@Bean
public RouterFunction<ServerResponse> root(ArticleHandler articleHandler) {
    return RouterFunctions.route()
            .GET("/article-stream-to-origin", RequestPredicates.accept(MediaType.valueOf(MediaType.APPLICATION_NDJSON_VALUE)), articleHandler::getArticleStreamToOrigen)
            .GET("/article-stream-from-origin/{backPressure}", RequestPredicates.accept(MediaType.valueOf(MediaType.APPLICATION_NDJSON_VALUE)), articleHandler::getArticleStreamFromOrigin)
            .GET("/article-batch-stream-from-origin/{backPressure}", RequestPredicates.accept(MediaType.valueOf(MediaType.APPLICATION_NDJSON_VALUE)), articleHandler::getArticleStreamFromOrigin)
            .GET("/articles", RequestPredicates.accept(MediaType.APPLICATION_JSON),  articleHandler::getAllArticles)

The handler

   public Mono<ServerResponse> getAllArticles(ServerRequest request) {
    var all = articleService.getAllArticles(Sort.by("id"));
    return ok().body(fromPublisher(all, ArticleDestiny.class));
}


    public Mono<ServerResponse> getArticleStreamFromOrigin(ServerRequest request) {
    long backPressure = Long.valueOf(request.pathVariable("backPressure"));
    Flux<ArticleOrigin> all = streamService.getAllArticleStreamFromOrigin(backPressure);

    return ok().contentType(MediaType.APPLICATION_NDJSON) //TEXT_EVENT_STREAM APPLICATION_NDJSON
            .body(fromPublisher(all, ArticleOrigin.class));
}

The client that calls the all-articles service

public  Mono<Void> getArticleBatchStreamToOrigin(Long backPressure) {

             Flux<ArticleDestiny> articleFlux = client.get()
            .uri("/article-batch-stream-from-origin/" + backPressure)
            .accept(MediaType.APPLICATION_NDJSON) //APPLICATION_NDJSON TEXT_EVENT_STREAM
            .retrieve()
            .bodyToFlux(ArticleDestiny.class);

    streamService.articleDestinyBatchStream(articleFlux, backPressure);
    return Mono.empty();
}

and finally the service with a single (blocking) connection

public Mono<Void> articleDestinyBatchStream(Flux<ArticleDestiny> articles, Long backPressure) {
    Connection con = Mono.from(Mono.from(connectionFactory.create())).block();
    articles
            .bufferTimeout(backPressure.intValue(), Duration.ofMillis(1000l))
            .publishOn(Schedulers.newParallel("main"))
            .doOnNext(a -> log.info("article batch size: " + a.size()))

            .flatMap(a -> {
                log.info("inside " + a.size());

                    Batch batch = con.createBatch();

                    a.forEach(article -> {
                        String sql = String.format("INSERT INTO article_destiny (country_code,art_number, name, price) VALUES ('%s','%s','%s','%s')",
                                article.getCountryCode(),
                                article.getArtNumber(),
                                article.getName(),
                                article.getPrice());
                        batch.add(sql);
                    });

                    log.debug("print each batch: " + batch);

                    return Mono.from(batch.execute()).doFinally((st -> con.close()));

            }).subscribe();

    return Mono.empty();

}

any comments to solve this blocking db connection?

Upvotes: 0

SRJ
SRJ

Reputation: 2806

It is skipped due to no subscribers after reactive chain is completed.

Try below code snippet.

    public void articleDestinyBatchStream(List<String> articles) {
        log.info("connectionFactory {}", connectionFactory.getMetadata().getName());

        Mono.from(connectionFactory.create())
                .flatMapMany(connection -> Flux.from(connection.createBatch()
                        .add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('DE', '123', 'some name', '23.23')")
                        .add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('BE', '456', 'some other name', '44.44')")
                        .add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('NL', '789', 'some new name', '55.55')")
                        .execute()))
                .then();

    }

then() statement throws away whatever the return value is and just signals to the calling client when everything is done.

Upvotes: 0

Related Questions