Reputation: 89
I'm developing a spring integration app which has a Kafka outbound channel adaptor and configuring the flows using spring integration java dsl.
I have configured the message handler spec similar to the following snippet.
KafkaProducerMessageHandlerSpec messageHandlerSpec = Kafka
.outboundChannelAdapter()
.addProducer(new ProducerMetadata<String, byte[]>(topicName, String.class, byte[].class,
new StringSerializer(), new ByteArraySerializer()), "localhost:9092");
I wish to add a ProducerListener. This capability was added to spring integration kafka here. https://github.com/spring-projects/spring-integration-kafka/pull/80
Can you please provide with me with the appropriate mechanism to add the ProducerListener using the Java DSL.
Thanks.
Upvotes: 1
Views: 605
Reputation: 121542
Well, this sound like a good improvement to SI Java DSL for the SI Kafka 1.3. Right now we have a compatibility there with the SI Kafka 1.2.x.
Meanwhile we will try to figure out the fix (feel free to raise an appropriate GH issue), here is a workaround for you:
KafkaProducerMessageHandlerSpec messageHandlerSpec = Kafka
.outboundChannelAdapter()
.addProducer(new ProducerMetadata<String, byte[]>(topicName, String.class, byte[].class,
new StringSerializer(), new ByteArraySerializer()), "localhost:9092");
KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) messageHandlerSpec.getComponentsToRegister().iterator().next();
ProducerConfiguration<?, ?> producerConfiguration = kafkaProducerContext.getTopicConfiguration(topicName);
producerConfiguration.setProducerListener(myProducerListener);
Upvotes: 0