wladez
wladez

Reputation: 13

RabbitTransactionManager cannot commit transaction at ChainedTransactionManager

I'm trying to use one transaction manager for Rabbit and Kafka. First I'm getting a message from a Rabbit callback, then sending it to Kafka topic. But I always get an exception indicating that Rabbit cannot finish the transaction correctly:

2019-11-14 16:02:46.572 ERROR 15640 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Could not configure the channel to receive publisher confirms java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:1552)
    at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:52)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:602)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:582)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$600(CachingConnectionFactory.java:99)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1053)
    at com.sun.proxy.$Proxy124.txCommit(Unknown Source)
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164)
    at org.springframework.amqp.rabbit.transaction.RabbitTransactionManager.doCommit(RabbitTransactionManager.java:187)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:746)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:532)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:304)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.listener.ExecutionCallbackListener$$EnhancerBySpringCGLIB$$9b575a95.receiveCallback(<generated>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1445)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1368)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1355)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1334)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode, class-id=85, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
    ... 37 common frames omitted

Here is the method where problem occurs in Rabbit listener:

@RabbitListener(queues = ["\${queue}"])
@Transactional("chainedTransactionManager")
fun receiveCallback(message: Message<List<CallbackMessage>>) {
    traceMessage(message)
    val callbacks = message.payload
    callbacks.forEach { callback ->
        kafkaService.sendAfterCallBack(Object())
    }
}

And method in KafkaService:

@Transactional("chainedTransactionManager")
fun sendAfterCallBack(object: Object) {
    convertAndSend(kafkaServiceProperties.topics.name, object)
}

Here is TransactionManager configuration:

@Configuration
class TransactionManagerConfiguration {

    @Bean
    fun chainedTransactionManager(
            rabbitTransactionManager: RabbitTransactionManager,
            kafkaTransactionManager: KafkaTransactionManager<*, *>
    ): ChainedTransactionManager {
        return ChainedTransactionManager(kafkaTransactionManager, rabbitTransactionManager)
    }
}

Rabbit configuration:

@Configuration
@EnableRabbit
@Import(RabbitAutoCreationConfiguration::class)
class RabbitConfiguration(
        private val integrationProperties: IntegrationProperties,
        private var clientProperties: RabbitClientProperties,
        private val jacksonObjectMapper: ObjectMapper
) : RabbitListenerConfigurer {

    @Bean
    fun rabbitListenerContainerFactory(connectionFactory: ConnectionFactory): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        factory.setConnectionFactory(connectionFactory)
        factory.setErrorHandler { t -> throw AmqpRejectAndDontRequeueException(t) }

        return factory
    }

    @Bean
    fun messageConverter(): MessageConverter {
        val messageConverter = MappingJackson2MessageConverter()
        messageConverter.objectMapper = jacksonObjectMapper

        return messageConverter
    }

    @Bean
    fun messageHandlerFactory(): MessageHandlerMethodFactory {
        val factory = DefaultMessageHandlerMethodFactory()
        factory.setMessageConverter(messageConverter())
        return factory
    }

    @Bean
    @ConditionalOnBean(CachingConnectionFactory::class)
    fun rabbitConnectionFactoryCustomizer(factory: CachingConnectionFactory): SmartInitializingSingleton {
        return SmartInitializingSingleton {
            factory.rabbitConnectionFactory.clientProperties.apply {
                clientProperties.copyright?.let { put("copyright", it) }

                put("os", System.getProperty("os.name"))
                put("host", InetAddress.getLocalHost().hostName)

                clientProperties.platform?.let { put("platform", it) }
                clientProperties.product?.let { put("product", it) }
                clientProperties.service?.let { put("service", it) }
            }
        }
    }

    override fun configureRabbitListeners(registrar: RabbitListenerEndpointRegistrar?) {
        registrar!!.messageHandlerMethodFactory = messageHandlerFactory()
    }

    @Bean
    fun rabbitTemplate(
            connectionFactory: ConnectionFactory,
            jsonObjectMapper: ObjectMapper
    ): RabbitTemplate {
        val rabbitTemplate = RabbitTemplate(connectionFactory)

        val retryTemplate = RetryTemplate()
        retryTemplate.setRetryPolicy(SimpleRetryPolicy(integrationProperties.callbackRetry))
        rabbitTemplate.setRetryTemplate(retryTemplate)
        rabbitTemplate.isChannelTransacted = true

        return rabbitTemplate
    }

    @Bean
    fun rabbitTransactionManager(connectionFactory: ConnectionFactory): RabbitTransactionManager {
        val rtm = RabbitTransactionManager(connectionFactory)
        rtm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
        return rtm
    }
}

Kafka configuration:

@Configuration
@EnableKafka
class KafkaConfiguration(
        @Qualifier("kafkaExchangeMessageConverter")
        private val messageConverter: MessagingMessageConverter
) {

    @Bean
    fun kafkaListenerContainerFactory(
            configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
            consumerFactory: ConsumerFactory<Any, Any>
    ): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        factory.setMessageConverter(messageConverter)
        configurer.configure(factory, consumerFactory)

        return factory
    }

    @Bean
    fun adminClient(kafkaAdmin: KafkaAdmin): AdminClient = AdminClient.create(kafkaAdmin.config)

    @Bean
    fun kafkaTransactionManager(
            producerFactory: ProducerFactory<*, *>
    ): KafkaTransactionManager<*, *> {
        val ktm = KafkaTransactionManager(producerFactory)

        ktm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION

        return ktm
    }
}

Did I miss something in RabbitConfiguration or problem is in something else?

Upvotes: 1

Views: 1311

Answers (1)

Gary Russell
Gary Russell

Reputation: 174709

reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode,

You cannot use publisher confirms and transactions on the same channel. Turn off publisher confirms.

Also, it's better to inject the chained transaction manager into the listener container rather than using @Transactional.

Upvotes: 1

Related Questions