Reputation: 31
I'm trying to use one transaction manager (ChainedTransactionManager) for Rabbit and Kafka, chaining RabbitTransactionManager and KafkaTransactionManager. We intend to achieve a Best effort 1-phase commit.
To test it, the transactional method throws an exception after the 2 operations (sending a message to a Rabbit exchange and publishing and event in Kafka). When running the test, the logs suggest a rollback is initiated but the message ends up in Rabbit anyway.
Here's the method where the problem occurs:
@Transactional
public void processMessageAndEvent() {
Message<String> message = MessageBuilder
.withPayload("Message to RabbitMQ")
.build();
outputToRabbitMQExchange.output().send(message);
outputToKafkaTopic.output().send(
withPayload("Message to Kafka")
.setHeader(KafkaHeaders.MESSAGE_KEY, "Kafka message key")
.build()
);
throw new RuntimeException("We want the previous changes to rollback");
}
Here is the main Spring-boot application configuration:
@SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
Here is TransactionManager configuration:
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka", MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm, rtm);
}
And finally, the relevant configuration in the application.yml file:
spring:
application:
name: my-application
main:
allow-bean-definition-overriding: true
cloud:
stream:
bindings:
source_outputToRabbitMQExchange:
content-type: application/json
destination: outputToRabbitMQExchange
group: ${spring.application.name}
sink_outputToKafkaTopic:
content-type: application/json
destination: outputToKafkaTopic
binder: kafka
rabbit:
bindings:
output_outputToRabbitMQExchange:
producer:
transacted: true
routing-key-expression: headers.myKey
kafka:
bindings:
sink_outputToKafkaTopic:
producer:
transacted: true
binder:
brokers: ${...kafka.hostname}
transaction:
transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
default-binder: rabbit
kafka:
producer:
properties:
max.block.ms: 3000
transaction.timeout.ms: 5000
enable.idempotence: true
retries: 1
acks: all
bootstrap-servers: ${...kafka.hostname}
When we execute the method, we can see the message is still in Rabbit despite the logs saying the transaction is to be rolled back.
Anything we could be missing or misunderstood?
Upvotes: 0
Views: 356
Reputation: 174719
@EnableBinding
is deprecated in favor of the newer functional programming model.
That said, I copied your code/config pretty-much as-is (transacted
is not a kafka producer binding property) and it works fine for me (Boot 2.4.5, cloud 2020.0.2)...
@SpringBootApplication
@EnableTransactionManagement
@EnableBinding(Bindings.class)
public class So67297869Application {
public static void main(String[] args) {
SpringApplication.run(So67297869Application.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm, rtm);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.send("test");
};
}
}
interface Bindings {
@Output("source_outputToRabbitMQExchange")
MessageChannel rabbitOut();
@Output("sink_outputToKafkaTopic")
MessageChannel kafkaOut();
}
@Component
class Foo {
@Autowired
Bindings bindings;
@Transactional
public void send(String in) {
bindings.rabbitOut().send(MessageBuilder.withPayload(in)
.setHeader("myKey", "test")
.build());
bindings.kafkaOut().send(MessageBuilder.withPayload(in)
.setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
.build());
throw new RuntimeException("fail");
}
}
spring:
application:
name: my-application
main:
allow-bean-definition-overriding: true
cloud:
stream:
bindings:
source_outputToRabbitMQExchange:
content-type: application/json
destination: outputToRabbitMQExchange
group: ${spring.application.name}
sink_outputToKafkaTopic:
content-type: application/json
destination: outputToKafkaTopic
binder: kafka
rabbit:
bindings:
source_outputToRabbitMQExchange:
producer:
transacted: true
routing-key-expression: headers.myKey
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: foo.${spring.application.name}.T
default-binder: rabbit
kafka:
producer:
properties:
max.block.ms: 3000
transaction.timeout.ms: 5000
enable.idempotence: true
retries: 1
acks: all
bootstrap-servers: localhost:9092
logging:
level:
org.springframework.transaction: debug
org.springframework.kafka: debug
org.springframework.amqp.rabbit: debug
2021-04-28 09:35:32.488 DEBUG 53253 --- [ main] o.s.a.r.t.RabbitTransactionManager : Initiating transaction rollback
2021-04-28 09:35:32.489 DEBUG 53253 --- [ main] o.s.a.r.connection.RabbitResourceHolder : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://[email protected]:5672/, localPort= 63439]
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.a.r.t.RabbitTransactionManager : Resuming suspended transaction after completion of inner transaction
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()
And there is no message in the queue that I bound to the exchange with RK #
.
What versions are you using?
EDIT
And here is the equivalent app after removing the deprecations, using the functional model and StreamBridge
(same yaml):
@SpringBootApplication
@EnableTransactionManagement
public class So67297869Application {
public static void main(String[] args) {
SpringApplication.run(So67297869Application.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm, rtm);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.send("test");
};
}
}
@Component
class Foo {
@Autowired
StreamBridge bridge;
@Transactional
public void send(String in) {
bridge.send("source_outputToRabbitMQExchange", MessageBuilder.withPayload(in)
.setHeader("myKey", "test")
.build());
bridge.send("sink_outputToKafkaTopic", MessageBuilder.withPayload(in)
.setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
.build());
throw new RuntimeException("fail");
}
}
Upvotes: 1