Hadi Mansouri
Hadi Mansouri

Reputation: 96

How to create PostgreSQL savePoint with Reactor and Spring

io.r2dbc.spi.Connection and PostgresqlConnection have createSavepoint, releaseSavepoint and rollbackTransactionToSavepoint methods. How can I use these method by having R2dbcTransactionManager and TransactionalOperator?

I want to create an idempotent service which tries to insert to a table, if a unique constraint is violated then selects the existing record and continues with that

Mono.just(newOrder)
    .flatMap(order -> orderRepository.save(order)
        .onErrorResume(throwable -> orderRepository.findByUniqueField(newOrder.uniqueField))

    .otherProcesses...
    .as(transactionalOperator::transactional)

And I receive current transaction is aborted, commands ignored until end of transaction block from PostgreSQL

I saw this answer https://stackoverflow.com/a/48771320/5275087 but it seams autosave=always doesn't work with r2dbc

I wanted to try something like:

transactionalOperator.execute(reactiveTransaction -> {
  GenericReactiveTransaction genericReactiveTransaction = (GenericReactiveTransaction) reactiveTransaction;
  ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) genericReactiveTransaction.getTransaction();
  Connection connection = txObject.getConnectionHolder().getConnection();
  return Mono.from(connection.createSavepoint(TRANSACTION_LEVEL_1))
            .then(Mono.just(newOrder)
                .flatMap(order -> orderRepository.save(order)
                    .onErrorResume(throwable ->
                        Mono.from(connection.rollbackTransactionToSavepoint(TRANSACTION_LEVEL_1))
                            .then(orderRepository.findByUniqueField(newOrder.uniqueField)))
                ));

But R2dbcTransactionManager.ConnectionFactoryTransactionObject is a private class.

How do I achieve this without using reflection

Upvotes: 1

Views: 249

Answers (0)

Related Questions