simsini2
simsini2

Reputation: 13

Why KafkaTemplate does not close tansactional producers?

I have written a simple Kafka app with spring integration kafka 3.2.1.RELEASE and kafka-clients 2.5 to learn kafka transactions.

It recieves the messages from a topic and sends them to another topic. The beans.xml file is as follows

    <int-kafka:message-driven-channel-adapter
            listener-container="container"
            auto-startup="true"
            send-timeout="30000"
            channel="channelA"/>
   <bean id="container"  class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.ContainerProperties">
                <constructor-arg
                        name="topics"
                        value="test"/>
                <property name="transactionManager" ref="KafkaTransactionManager"/>
            </bean>
        </constructor-arg>
    </bean>
.
.
.
    <int-kafka:outbound-channel-adapter kafka-template="kafkaTemplate"
                                        auto-startup="true"
                                        channel="channelB"
                                        topic="output"/>
      <bean id="dbsenderTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                        <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                        <entry key="bootstrap.servers" value="localhost:9092"/>
                    </map>
                </constructor-arg>
                <property name="transactionIdPrefix" value="mytest-"/>
                <property name="producerPerConsumerPartition" value="false"/>
            </bean>
        </constructor-arg>
    </bean>

The code that starts the app is as follows:

GenericXmlApplicationContext tempContext = new GenericXmlApplicationContext("beans.xml");
tempContext.close();

//POINT A.

try {
  Thread.sleep(60000);
} catch (InterruptedException e) {
   e.printStackTrace();
}

GenericXmlApplicationContext context = new GenericXmlApplicationContext();
context.load("beans.xml");
context.refresh();

//POINT B

At POINT A I just closed the context to check which beans are closed, and put a 60 seconds sleep to have time to check the JMX console. I noticed that even though the context is closed but the producer is still registered in JMX. After that I traced the code and noticed that on context closing the KafkaTemplate calls the following code:

    public void flush() {
        Producer<K, V> producer = getTheProducer();
        try {
            producer.flush();
        }
        finally {
            closeProducer(producer, inTransaction());
        }
    }

    protected void closeProducer(Producer<K, V> producer, boolean inTx) {
        if (!inTx) {
            producer.close(this.closeTimeout);
        }
    }

It means it creates a producer but because it is transactional it will not be closed.

This behaviour makes that runnning the context again on POINT B and sending the message cause the javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-mytest-0 Exception. Why the KafkaTemplate does not close these producers?

And another question is what happens to these producers when a new KafkaTemplate is created on POINT B?

The last question is if I change the producerPerConsumerPartition property to true the mentioned app still registers producer Mbean with producer-mytest-0 and does not follow the groupid.topic.partition pattern in naming. Is it a correct behaviour?

UPDATES:

I understood when the KafkaTemplate executeInTransaction is called. At the finally block it calls the close on the producer and as it is a logical close, the following code is called on the CloseSafeProducer and put it in the cache:

if (!this.cache.contains(this)
        && !this.cache.offer(this)) {
        this.delegate.close(closeTimeout);
        }

This makes when the context is closed the destroy method of DefaultKafkaProducerFactory clears the cache and closes the producer physically. But in my situation application context is created but before consume and producing any message the context is closed, only the flush method of KafkaTemplate is called internally that force it to create a transactional producer but does not put it in the cache. As I didn't start a producer and KafkaTemplate do it on flush, is not it good that DefaultKafkaProducerFactory put them in cache before using them?

Upvotes: 1

Views: 2359

Answers (1)

Gary Russell
Gary Russell

Reputation: 174534

The producer cannot be closed if this template operation is participating in a transaction that was started outside of the template.

Even when closed, it is only "logically" closed - cached for reuse by another operation.

Is it a correct behaviour?

Yes, for producer-initiated transactions; the alternative name is used when a consumer initiates the transaction.

The InstanceAlreadyExistsException problem is simply because you are creating two application contexts with identical configuration. Why are you doing that?

Upvotes: 2

Related Questions