Reputation: 1447
I need to insert multiple rows into one table in one batch. In DatabaseClient i found insert() statement and using(Publisher objectToInsert) method which has multiple objects as argument. But would it insert them in one batch or not? Another possible solution is connection.createBatch(), but it has a drowback : I cannot pass my Entity object there and i cannot generate sql query from the entity.
So, is it possible to create batch insert in r2dbc?
Upvotes: 9
Views: 13615
Reputation: 1
It may be useful for r2dbc-postgresql.
data class Entity(
val entityId: UUID,
val fdn: String? = null,
val dateStart: LocalDate? = null,
val dateEnd: LocalDate? = null
) {
object Columns {
const val ID = "id"
const val FDN = "fdn"
const val DATE_START = "date_start"
const val DATE_END = "date_end"
}
}
And then load to DB using postgres COPY
command and apache's commons-csv
library
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
class PgCopyLoader @Autowired constructor(
private val r2dbcProperties: R2DbcProperties
) {
suspend fun load(tableName: String, entities: List<Entity>): Long {
return initConnection().use { connection ->
connection.autoCommit = true
val sql = "COPY $tableName FROM STDIN WITH (FORMAT csv, HEADER true)"
val from = toCsv(entities)
CopyManager(connection as BaseConnection).copyIn(sql, from)
}
}
private fun initConnection(): Connection {
val url = r2dbcProperties.url.replace("r2dbc", "jdbc")
val user = r2dbcProperties.username
val password = r2dbcProperties.password
return DriverManager.getConnection(url, user, password)
}
private fun toCsv(entities: List<Entity>): InputStream {
val baos = ByteArrayOutputStream()
OutputStreamWriter(baos, charset).use { writer ->
CSVPrinter(writer, format).use { printer ->
printer.printRecord(
ID, FDN, DATE_START, DATE_END
)
entities.forEach { e ->
printer.printRecord(
e.entityId, e.fdn, e.dateStart, e.dateEnd
)
}
}
}
return ByteArrayInputStream(baos.toByteArray())
}
companion object {
private val format = CSVFormat.POSTGRESQL_CSV
private val charset = StandardCharsets.UTF_8
}
}
See also:
Upvotes: 0
Reputation: 31
Unfortunately still no built-in support.
Only issue I had with the answer of @Hantsy was that I got the error No unfinished bindings!
.
Below is an adjusted working solution (No statement.add()
on the last binding!
public class Post {
@Id
private Integer id;
private String title;
private String post;
}
public Flux<Post> saveAll(List<Post> postList) {
return databaseClient.inConnectionMany(connection -> {
Statement statement = connection.createStatement("insert into POST (TITLE, POST) " +
" VALUES ($1, $2)")
.returnGeneratedValues("ID", "TITLE", "POST");
for (int i = 0; i < postList.size(); ) {
Post post = postList.get(i);
statement.bind(0, post.getTitle())
.bind(1, post.getPost());
if (++i < postList.size()) {
statement.add();
}
}
return Flux.from(statement.execute())
.flatMap(result -> (Flux<Post>) result.map((row, rowMetadata) -> {
Integer id = row.get("ID", Integer.class);
String post = row.get("POST", String.class);
String title = row.get("TITLE", String.class);
return new Post(id, post, title);
}));
});
}
Upvotes: 0
Reputation: 340
There are two questions:
Would
DatabaseClient.insert()
insert them in one batch or not?
Not a batch.
Is it possible to create batch insert in r2dbc? (except
Connection.createBatch()
)
No, use Connection.createBatch()
is only one way to create a Batch
for now.
See also issues:
Upvotes: 7
Reputation: 9261
There is no direct support till now, but I found it is possible to use Connection
to overcome this barrier simply, check out this issue, spring-data-r2dbc#259
The statement
has a add
to repeat to bind parameters.
The complete codes of my solution can be found here.
return this.databaseClient.inConnectionMany(connection -> {
var statement = connection.createStatement("INSERT INTO posts (title, content) VALUES ($1, $2)")
.returnGeneratedValues("id");
for (var p : data) {
statement.bind(0, p.getTitle()).bind(1, p.getContent()).add();
}
return Flux.from(statement.execute()).flatMap(result -> result.map((row, rowMetadata) -> row.get("id", UUID.class)));
});
A test for this method.
@Test
public void testSaveAll() {
var data = Post.builder().title("test").content("content").build();
var data1 = Post.builder().title("test1").content("content1").build();
var result = posts.saveAll(List.of(data, data1)).log("[Generated result]")
.doOnNext(id->log.info("generated id: {}", id));
assertThat(result).isNotNull();
result.as(StepVerifier::create)
.expectNextCount(2)
.verifyComplete();
}
The generated ids are printed as expected in the console.
...
2020-10-08 11:29:19,662 INFO [reactor-tcp-nio-2] reactor.util.Loggers$Slf4JLogger:274 onNext(a3105647-a4bc-4986-9ad4-1e6de901449f)
2020-10-08 11:29:19,664 INFO [reactor-tcp-nio-2] com.example.demo.PostRepositoryTest:31 generated id: a3105647-a4bc-4986-9ad4-1e6de901449f
//.....
2020-10-08 11:29:19,671 INFO [reactor-tcp-nio-2] reactor.util.Loggers$Slf4JLogger:274 onNext(a611d766-f983-4c8e-9dc9-fc78775911e5)
2020-10-08 11:29:19,671 INFO [reactor-tcp-nio-2] com.example.demo.PostRepositoryTest:31 generated id: a611d766-f983-4c8e-9dc9-fc78775911e5
//......
Process finished with exit code 0
Upvotes: 9