Reputation: 13
I'm trying to use one transaction manager for Rabbit and Kafka. First I'm getting a message from a Rabbit callback, then sending it to Kafka topic. But I always get an exception indicating that Rabbit cannot finish the transaction correctly:
2019-11-14 16:02:46.572 ERROR 15640 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Could not configure the channel to receive publisher confirms java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:1552)
at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:52)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:602)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:582)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$600(CachingConnectionFactory.java:99)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1053)
at com.sun.proxy.$Proxy124.txCommit(Unknown Source)
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164)
at org.springframework.amqp.rabbit.transaction.RabbitTransactionManager.doCommit(RabbitTransactionManager.java:187)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:746)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:532)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:304)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.listener.ExecutionCallbackListener$$EnhancerBySpringCGLIB$$9b575a95.receiveCallback(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1445)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1368)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1355)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1334)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode, class-id=85, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
... 37 common frames omitted
Here is the method where problem occurs in Rabbit listener:
@RabbitListener(queues = ["\${queue}"])
@Transactional("chainedTransactionManager")
fun receiveCallback(message: Message<List<CallbackMessage>>) {
traceMessage(message)
val callbacks = message.payload
callbacks.forEach { callback ->
kafkaService.sendAfterCallBack(Object())
}
}
And method in KafkaService
:
@Transactional("chainedTransactionManager")
fun sendAfterCallBack(object: Object) {
convertAndSend(kafkaServiceProperties.topics.name, object)
}
Here is TransactionManager
configuration:
@Configuration
class TransactionManagerConfiguration {
@Bean
fun chainedTransactionManager(
rabbitTransactionManager: RabbitTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
): ChainedTransactionManager {
return ChainedTransactionManager(kafkaTransactionManager, rabbitTransactionManager)
}
}
Rabbit configuration:
@Configuration
@EnableRabbit
@Import(RabbitAutoCreationConfiguration::class)
class RabbitConfiguration(
private val integrationProperties: IntegrationProperties,
private var clientProperties: RabbitClientProperties,
private val jacksonObjectMapper: ObjectMapper
) : RabbitListenerConfigurer {
@Bean
fun rabbitListenerContainerFactory(connectionFactory: ConnectionFactory): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(connectionFactory)
factory.setErrorHandler { t -> throw AmqpRejectAndDontRequeueException(t) }
return factory
}
@Bean
fun messageConverter(): MessageConverter {
val messageConverter = MappingJackson2MessageConverter()
messageConverter.objectMapper = jacksonObjectMapper
return messageConverter
}
@Bean
fun messageHandlerFactory(): MessageHandlerMethodFactory {
val factory = DefaultMessageHandlerMethodFactory()
factory.setMessageConverter(messageConverter())
return factory
}
@Bean
@ConditionalOnBean(CachingConnectionFactory::class)
fun rabbitConnectionFactoryCustomizer(factory: CachingConnectionFactory): SmartInitializingSingleton {
return SmartInitializingSingleton {
factory.rabbitConnectionFactory.clientProperties.apply {
clientProperties.copyright?.let { put("copyright", it) }
put("os", System.getProperty("os.name"))
put("host", InetAddress.getLocalHost().hostName)
clientProperties.platform?.let { put("platform", it) }
clientProperties.product?.let { put("product", it) }
clientProperties.service?.let { put("service", it) }
}
}
}
override fun configureRabbitListeners(registrar: RabbitListenerEndpointRegistrar?) {
registrar!!.messageHandlerMethodFactory = messageHandlerFactory()
}
@Bean
fun rabbitTemplate(
connectionFactory: ConnectionFactory,
jsonObjectMapper: ObjectMapper
): RabbitTemplate {
val rabbitTemplate = RabbitTemplate(connectionFactory)
val retryTemplate = RetryTemplate()
retryTemplate.setRetryPolicy(SimpleRetryPolicy(integrationProperties.callbackRetry))
rabbitTemplate.setRetryTemplate(retryTemplate)
rabbitTemplate.isChannelTransacted = true
return rabbitTemplate
}
@Bean
fun rabbitTransactionManager(connectionFactory: ConnectionFactory): RabbitTransactionManager {
val rtm = RabbitTransactionManager(connectionFactory)
rtm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
return rtm
}
}
Kafka configuration:
@Configuration
@EnableKafka
class KafkaConfiguration(
@Qualifier("kafkaExchangeMessageConverter")
private val messageConverter: MessagingMessageConverter
) {
@Bean
fun kafkaListenerContainerFactory(
configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
consumerFactory: ConsumerFactory<Any, Any>
): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
factory.setMessageConverter(messageConverter)
configurer.configure(factory, consumerFactory)
return factory
}
@Bean
fun adminClient(kafkaAdmin: KafkaAdmin): AdminClient = AdminClient.create(kafkaAdmin.config)
@Bean
fun kafkaTransactionManager(
producerFactory: ProducerFactory<*, *>
): KafkaTransactionManager<*, *> {
val ktm = KafkaTransactionManager(producerFactory)
ktm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
return ktm
}
}
Did I miss something in RabbitConfiguration
or problem is in something else?
Upvotes: 1
Views: 1311
Reputation: 174709
reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode,
You cannot use publisher confirms and transactions on the same channel. Turn off publisher confirms.
Also, it's better to inject the chained transaction manager into the listener container rather than using @Transactional
.
Upvotes: 1