Reputation: 155
I am setting up a basic Spring Cloud Stream producer with Kafka. The intent is to accept a HTTP POST, save the result of the post to a database with Spring Data JPA, and write the results to a Kafka topic using Spring Cloud Stream Kafka Binder. I am following the latest binder documentation on how to setup a KafkaTransactionManager, but this code results in an error on Application startup.
***************************
APPLICATION FAILED TO START
***************************
Description:
The dependencies of some of the beans in the application context form a cycle:
┌─────┐
| kafkaTransactionManager defined in com.example.tx.Application
↑ ↓
| org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration
└─────┘
I have the following Bean defined in my Application class, which is the same as documentation.
@Bean
public KafkaTransactionManager kafkaTransactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionIdPrefix("tx-test");
return tm;
}
It seems that calling getBinder
causes Spring to create the context again. How can I resolve this circular dependency?
Dependencies: Spring Boot parent 2.4.6; Spring Cloud BOM 2020.0.3
Upvotes: 0
Views: 288
Reputation: 174494
Something must have changed in one of the layers; here is a work around:
@Bean
SmartInitializingSingleton ktmProvider(BinderFactory binders, GenericApplicationContext context) {
return () -> {
context.registerBean("kafkaTransactionManager", KafkaTransactionManager.class,
((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
.getTransactionalProducerFactory());
context.getBean(KafkaTransactionManager.class).setTransactionIdPrefix("tx-test");
};
}
i.e. wait for the other beans to be created before registering and configuring the tm.
Upvotes: 1