Fletch
Fletch

Reputation: 907

How to specify multiple topics in separate config properties for one Kafka listener?

I would like to create a spring boot application that reads from several Kafka topics. I realise I can create a comma separated list of topics on my appliation.properties, however I would like the topic names to be listed separately for readability and so I can use each topic name to work out how to process the message.

I've found the following questions, but they all have the topics listed as a comma separated array:

Consume multiple topics in one listener in spring boot kafka

Using multiple topic names with KafkaListener annotation

Enabling @KafkaListener to take in variable topic names from application.yml file

Pass array list of topic names to @KafkaListener

The closest I've come is with the following:

application.properties

kafka.topic1=topic1
kafka.topic2=topic2

KafkaConsumer

@KafkaListener(topics = "#{'${kafka.topic1}'},#{'${kafka.topic2}'}")
public void receive(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
      @Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
      @Payload(required = false) String payload) throws IOException {
}       

This gives the error:

Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]

I realise I need it to be {"topic1", "topic2} but I can't work out how.

Having the annotation @KafkaListener(topics = "#{'${kafka.topic1}'}") correctly subscribes to the first topic. And if I change it to @KafkaListener(topics = "#{'${kafka.topic2}'}") I can correctly subscribe to the second topic.

It's just the creating of the array of topics in the annotation that I can't fathom.

Any help would be wonderful

Upvotes: 3

Views: 13475

Answers (2)

Rorschach
Rorschach

Reputation: 111

If you have your topics configured as comma seperated like:

kafka.topics = topic1,topic2

In this case you can simply use:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
void listen(){}

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174484

@KafkaListener(id = "so71497475", topics = { "${kafka.topic1}", "${kafka.topic2}" })

EDIT

And this is a more sophisticated technique which would allow you to add more topics without changing any code:

@SpringBootApplication
@EnableConfigurationProperties
public class So71497475Application {

    public static void main(String[] args) {
        SpringApplication.run(So71497475Application.class, args);
    }

    @KafkaListener(id = "so71497475", topics = "#{@myProps.kafkaTopics}")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean // This will add the topics to the broker if not present
    KafkaAdmin.NewTopics topics(MyProps props) {
        return new KafkaAdmin.NewTopics(props.getTopics().stream()
                .map(t -> TopicBuilder.name(t).partitions(1).replicas(1).build())
                .toArray(size -> new NewTopic[size]));
    }

}

@ConfigurationProperties("my.kafka")
@Component
class MyProps {

    private List<String> topics = new ArrayList<>();

    public List<String> getTopics() {
        return this.topics;
    }

    public void setTopics(List<String> topics) {
        this.topics = topics;
    }

    public String[] getKafkaTopics() {
        return this.topics.toArray(new String[0]);
    }

}
my.kafka.topics[0]=topic1
my.kafka.topics[1]=topic2
my.kafka.topics[2]=topic3
so71497475: partitions assigned: [topic1-0, topic2-0, topic3-0]

Upvotes: 6

Related Questions