Reputation: 96
io.r2dbc.spi.Connection and PostgresqlConnection have createSavepoint
, releaseSavepoint
and rollbackTransactionToSavepoint
methods.
How can I use these method by having R2dbcTransactionManager
and TransactionalOperator
?
I want to create an idempotent service which tries to insert to a table, if a unique constraint is violated then selects the existing record and continues with that
Mono.just(newOrder)
.flatMap(order -> orderRepository.save(order)
.onErrorResume(throwable -> orderRepository.findByUniqueField(newOrder.uniqueField))
.otherProcesses...
.as(transactionalOperator::transactional)
And I receive current transaction is aborted, commands ignored until end of transaction block
from PostgreSQL
I saw this answer https://stackoverflow.com/a/48771320/5275087 but it seams autosave=always
doesn't work with r2dbc
I wanted to try something like:
transactionalOperator.execute(reactiveTransaction -> {
GenericReactiveTransaction genericReactiveTransaction = (GenericReactiveTransaction) reactiveTransaction;
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) genericReactiveTransaction.getTransaction();
Connection connection = txObject.getConnectionHolder().getConnection();
return Mono.from(connection.createSavepoint(TRANSACTION_LEVEL_1))
.then(Mono.just(newOrder)
.flatMap(order -> orderRepository.save(order)
.onErrorResume(throwable ->
Mono.from(connection.rollbackTransactionToSavepoint(TRANSACTION_LEVEL_1))
.then(orderRepository.findByUniqueField(newOrder.uniqueField)))
));
But R2dbcTransactionManager.ConnectionFactoryTransactionObject
is a private class.
How do I achieve this without using reflection
Upvotes: 1
Views: 249