Reputation: 369
I am trying to get 3 inserts executed within the same transaction, but I am not able to get the transaction rolled back when one of the inserts fail.
I am new in the reactive world and this is my very first reactive application.
Here is a simplification of the database model:
EntityA 1---N EntityB
EntityA 1---N EntityC
I want to execute the following inserts within the same transaction:
INSERT INTO A
INSERT INTO B --(failing query)
INSERT INTO C
But, when the second insert fails, the first insert does not rollback.
I've got the following classes:
Processor
: receives a message from kafka and triggers the inserts though a ServiceService
: runs the 3 inserts by using 3 DAOsEntityADao
: runs the insert of the entity AEntityBDao
: runs the insert of the entity BEntityBDao
: runs the insert of the entity C@ApplicationScoped
public class Processor {
private final Service service;
public Processor(final Service service) {
this.service = service;
}
@Incoming("input-channel")
@Outgoing("output-channel")
public Uni<Message<RequestMessage>> process(final Message<RequestMessage> message) {
final RequestMessage rm = message.getPayload();
return service.saveEntities(rm)
.onFailure()
.recoverWithItem(e -> {
final String errorMessage = "There was an unexpected error while saving entities";
LOG.error(errorMessage, e);
return Result.KO;
})
.flatMap(result -> {
rm.setResult(result);
return Uni.createFrom()
.item(Message.of(rm), message::ack))
});
}
}
@ApplicationScoped
public class WorkerService {
private final EntityADao entityADao;
private final EntityBDao entityBDao;
private final EntityCDao entityCDao;
public WorkerService(final EntityADao entityADao,
final EntityBDao entityBDao,
final EntityCDao entityCDao) {
this.entityADao = entityADao;
this.entityBDao = entityBDao;
this.entityCDao = entityCDao;
}
@Transactional(TxType.REQUIRED)
public Uni<Result> saveEntities(final RequestMessage requestMessage) {
return Uni.createFrom().item(Result.OK)
// Save Entity A
.flatMap(result -> {
LOG.debug("(1) Saving EntityA ...");
return entityADao.save(requestMessage.getEntityAData());
})
// Save Entity B
.flatMap(result -> {
LOG.debug("(2) Saving EntityB ...");
return entityBDao.save(requestMessage.getEntityBData());
})
// Save Entity C
.flatMap(result -> {
LOG.debug("(3) Saving EntityC ...");
return entityCDao.dao(requestMessage.getEntityCData());
})
// Return OK
.flatMap(result -> Uni.createFrom().item(Result.OK));
}
}
@ApplicationScoped
public class EntityADao {
private final PgPool client;
public EntityADao(final PgPool client) {
this.client = client;
}
@Transactional(TxType.MANDATORY)
public Uni<Result> save(final EntityAData entityAData) {
return client
.preparedQuery(
"INSERT INTO A(col1, col2, col3) " +
"VALUES ($1, $2, $3)")
.execute(Tuple.of(entityAData.col1(), entityAData.col2(), entityAData.col3()))
.flatMap(pgRowSet -> {
LOG.debug("Inserted EntityA!");
return Result.OK;
});
}
}
EntityBDao
and EntityCDao
are like EntityADao
.
I have already added the following dependencies to pom.xml
:
quarkus-smallrye-context-propagation
quarkus-narayana-jta
Why when the INSERT B
query in EntityBDao
fails, the previously executed query (INSERT A
) does not rollback? What do am I missing? What would I have to change in order to get this working?
Upvotes: 2
Views: 2872
Reputation: 10539
This paragraph recently added to our Quarkus documentation should help you with this: https://quarkus.io/guides/reactive-sql-clients#transactions .
It specifically explains how to deal with transactions when using the Reactive SQL clients.
Upvotes: 4