voipp
voipp

Reputation: 1447

How to execute multiple inserts in batch in r2dbc?

I need to insert multiple rows into one table in one batch. In DatabaseClient i found insert() statement and using(Publisher objectToInsert) method which has multiple objects as argument. But would it insert them in one batch or not? Another possible solution is connection.createBatch(), but it has a drowback : I cannot pass my Entity object there and i cannot generate sql query from the entity.

So, is it possible to create batch insert in r2dbc?

Upvotes: 9

Views: 13615

Answers (4)

user23545691
user23545691

Reputation: 1

It may be useful for r2dbc-postgresql.

data class Entity(
    val entityId: UUID,
    val fdn: String? = null,
    val dateStart: LocalDate? = null,
    val dateEnd: LocalDate? = null
) {
    object Columns {
        const val ID = "id"
        const val FDN = "fdn"
        const val DATE_START = "date_start"
        const val DATE_END = "date_end"
    }
}

And then load to DB using postgres COPY command and apache's commons-csv library

import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection

class PgCopyLoader @Autowired constructor(
    private val r2dbcProperties: R2DbcProperties
) {

    suspend fun load(tableName: String, entities: List<Entity>): Long {
        return initConnection().use { connection ->
            connection.autoCommit = true

            val sql = "COPY $tableName FROM STDIN WITH (FORMAT csv, HEADER true)"
            val from = toCsv(entities)

            CopyManager(connection as BaseConnection).copyIn(sql, from)
        }
    }

    private fun initConnection(): Connection {
        val url = r2dbcProperties.url.replace("r2dbc", "jdbc")
        val user = r2dbcProperties.username
        val password = r2dbcProperties.password

        return DriverManager.getConnection(url, user, password)
    }

    private fun toCsv(entities: List<Entity>): InputStream {
        val baos = ByteArrayOutputStream()

        OutputStreamWriter(baos, charset).use { writer ->
            CSVPrinter(writer, format).use { printer ->
                printer.printRecord(
                    ID, FDN, DATE_START, DATE_END
                )

                entities.forEach { e ->
                    printer.printRecord(
                        e.entityId, e.fdn, e.dateStart, e.dateEnd
                    )
                }
            }
        }

        return ByteArrayInputStream(baos.toByteArray())
    }

    companion object {
        private val format = CSVFormat.POSTGRESQL_CSV
        private val charset = StandardCharsets.UTF_8
    }
}

See also:

Upvotes: 0

Ghostwritertje
Ghostwritertje

Reputation: 31

Unfortunately still no built-in support. Only issue I had with the answer of @Hantsy was that I got the error No unfinished bindings!.

Below is an adjusted working solution (No statement.add() on the last binding!

    public class Post {
        @Id
        private Integer id;
        private String title;
        private String post;
    }
    
    public Flux<Post> saveAll(List<Post> postList) {
        return databaseClient.inConnectionMany(connection -> {
            Statement statement = connection.createStatement("insert into POST (TITLE, POST) " +
                            " VALUES ($1, $2)")
                    .returnGeneratedValues("ID", "TITLE", "POST");
            for (int i = 0; i < postList.size(); ) {
                Post post = postList.get(i);
                statement.bind(0, post.getTitle())
                        .bind(1, post.getPost());
                if (++i < postList.size()) {
                    statement.add();
                }
            }

            return Flux.from(statement.execute())
                    .flatMap(result -> (Flux<Post>) result.map((row, rowMetadata) -> {
                        Integer id = row.get("ID", Integer.class);
                        String post = row.get("POST", String.class);
                        String title = row.get("TITLE", String.class);
                        return new Post(id, post, title);
                    }));
        });
    }

Upvotes: 0

Mirro Mutth
Mirro Mutth

Reputation: 340

There are two questions:

Would DatabaseClient.insert() insert them in one batch or not?

Not a batch.

Is it possible to create batch insert in r2dbc? (except Connection.createBatch())

No, use Connection.createBatch() is only one way to create a Batch for now.

See also issues:

Upvotes: 7

Hantsy
Hantsy

Reputation: 9261

There is no direct support till now, but I found it is possible to use Connection to overcome this barrier simply, check out this issue, spring-data-r2dbc#259

The statement has a add to repeat to bind parameters.

The complete codes of my solution can be found here.

        return this.databaseClient.inConnectionMany(connection -> {

            var statement = connection.createStatement("INSERT INTO  posts (title, content) VALUES ($1, $2)")
                    .returnGeneratedValues("id");

            for (var p : data) {
                statement.bind(0, p.getTitle()).bind(1, p.getContent()).add();
            }

            return Flux.from(statement.execute()).flatMap(result -> result.map((row, rowMetadata) -> row.get("id", UUID.class)));
        });

A test for this method.

@Test
    public void testSaveAll() {

        var data = Post.builder().title("test").content("content").build();
        var data1 = Post.builder().title("test1").content("content1").build();

        var result = posts.saveAll(List.of(data, data1)).log("[Generated result]")
                .doOnNext(id->log.info("generated id: {}", id));

        assertThat(result).isNotNull();
        result.as(StepVerifier::create)
                .expectNextCount(2)
                .verifyComplete();
    }

The generated ids are printed as expected in the console.

...
2020-10-08 11:29:19,662 INFO [reactor-tcp-nio-2] reactor.util.Loggers$Slf4JLogger:274 onNext(a3105647-a4bc-4986-9ad4-1e6de901449f)
2020-10-08 11:29:19,664 INFO [reactor-tcp-nio-2] com.example.demo.PostRepositoryTest:31 generated id: a3105647-a4bc-4986-9ad4-1e6de901449f
//.....
2020-10-08 11:29:19,671 INFO [reactor-tcp-nio-2] reactor.util.Loggers$Slf4JLogger:274 onNext(a611d766-f983-4c8e-9dc9-fc78775911e5)
2020-10-08 11:29:19,671 INFO [reactor-tcp-nio-2] com.example.demo.PostRepositoryTest:31 generated id: a611d766-f983-4c8e-9dc9-fc78775911e5
//......

Process finished with exit code 0

Upvotes: 9

Related Questions