Bert S.
Bert S.

Reputation: 155

Spring Cloud Stream Kafka Binder KafkaTransactionManager results in a cycle in application context

I am setting up a basic Spring Cloud Stream producer with Kafka. The intent is to accept a HTTP POST, save the result of the post to a database with Spring Data JPA, and write the results to a Kafka topic using Spring Cloud Stream Kafka Binder. I am following the latest binder documentation on how to setup a KafkaTransactionManager, but this code results in an error on Application startup.

***************************
APPLICATION FAILED TO START
***************************

Description:

The dependencies of some of the beans in the application context form a cycle:

┌─────┐
|  kafkaTransactionManager defined in com.example.tx.Application
↑     ↓
|  org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration
└─────┘

I have the following Bean defined in my Application class, which is the same as documentation.

@Bean
public KafkaTransactionManager kafkaTransactionManager(BinderFactory binders) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionIdPrefix("tx-test");
    return tm;
}

It seems that calling getBinder causes Spring to create the context again. How can I resolve this circular dependency?

Dependencies: Spring Boot parent 2.4.6; Spring Cloud BOM 2020.0.3

Upvotes: 0

Views: 288

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Something must have changed in one of the layers; here is a work around:

@Bean
SmartInitializingSingleton ktmProvider(BinderFactory binders, GenericApplicationContext context) {
    return () -> {
        context.registerBean("kafkaTransactionManager", KafkaTransactionManager.class,
                ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
                        .getTransactionalProducerFactory());
        context.getBean(KafkaTransactionManager.class).setTransactionIdPrefix("tx-test");
    };
}

i.e. wait for the other beans to be created before registering and configuring the tm.

Upvotes: 1

Related Questions