Iqbal
Iqbal

Reputation: 73

How to dynamically create multiple consumers in Spring Kafka

I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?

Spring-boot: 2.1.3.RELEASE Kafka-2.0.1 Java-8

Upvotes: 1

Views: 5389

Answers (2)

Valery Putnin
Valery Putnin

Reputation: 71

  1. Create a template class for future listeners:

     public class KafkaTemplateListener implements MessageListener<String, String> {
         @Override
         public void onMessage(ConsumerRecord<String, String> record) {
             System.out.println("RECORD PROCESSING: " + record);
         }
     }
    
  2. Create KafkaListenerEndpoint with the implemented template:

     @Service
     public class KafkaListenerCreator {
         String kafkaGroupId = "kafkaGroupId";
         String kafkaListenerId = "kafkaListenerId-";
         static AtomicLong endpointIdIndex = new AtomicLong(1);
    
         private KafkaListenerEndpoint createKafkaListenerEndpoint(String topic) {
             MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
                 createDefaultMethodKafkaListenerEndpoint(topic);
             kafkaListenerEndpoint.setBean(new KafkaTemplateListener());
             try {
                 kafkaListenerEndpoint.setMethod(KafkaTemplateListener.class
                     .getMethod("onMessage", ConsumerRecord.class));
             } catch (NoSuchMethodException e) {
                 throw new RuntimeException("Attempt to call a non-existent method " + e);
             }
             return kafkaListenerEndpoint;
         }
    
         private MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String topic) {
             MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
             kafkaListenerEndpoint.setId(generateListenerId());
             kafkaListenerEndpoint.setGroupId(kafkaGroupId);
             kafkaListenerEndpoint.setAutoStartup(true);
             kafkaListenerEndpoint.setTopics(topic);
             kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
             return kafkaListenerEndpoint;
         }
    
         private String generateListenerId() {
             return kafkaGeneralListenerEndpointId + endpointIdIndex.getAndIncrement();
    
         }
     }
    
  3. Register KafkaListenerEndpointRegistry with the endpoint:

     @Service
     public class KafkaListenerCreator {
         //... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE
    
         @Autowired
         private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
         @Autowired
         private KafkaListenerContainerFactory kafkaListenerContainerFactory;
    
         public void createAndRegisterListener(String topic) {
             KafkaListenerEndpoint listener = createKafkaListenerEndpoint(topic);
             kafkaListenerEndpointRegistry.registerListenerContainer(listener, kafkaListenerContainerFactory, true);
         }
    
         //... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE
    
     }
    

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174484

Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...

@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application {

    public static void main(String[] args) {
        SpringApplication.run(So55311070Application.class, args);
    }

    private final Map<String, MyListener> listeners = new HashMap<>();

    @Bean
    public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
            ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
            ApplicationContext context, KafkaListenerEndpointRegistry registry) {

        return args -> {
            AtomicInteger instance = new AtomicInteger();
            Arrays.stream(props.getClusters()).forEach(cluster -> {
                Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
                consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
                String groupId = "group" + instance.getAndIncrement();
                consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
                this.listeners.put(groupId, context.getBean("listener", MyListener.class));
            });
            registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
        };
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public MyListener listener() {
        return new MyListener();
    }

}

class MyListener {

    @KafkaListener(topics = "so55311070")
    public void listen(String in) {
        System.out.println(in);
    }

}

@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties {

    private String[] clusters;

    public String[] getClusters() {
        return this.clusters;
    }

    public void setClusters(String[] clusters) {
        this.clusters = clusters;
    }

}
kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

Result

group0
group1
...
2019-03-23 11:43:25.993  INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
    : partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994  INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
    : partitions assigned: [so55311070-0]

EDIT

Add code to retry starting failed containers.

It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.

@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application {

    public static void main(String[] args) {
        SpringApplication.run(So55311070Application.class, args);
    }

    private boolean atLeastOneFailure;

    private ScheduledFuture<?> restartTask;

    @Bean
    public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
            ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
            ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {

        return args -> {
            AtomicInteger instance = new AtomicInteger();
            Arrays.stream(props.getClusters()).forEach(cluster -> {
                Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
                consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
                String groupId = "group" + instance.getAndIncrement();
                consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                attemptStart(containerFactory, context, consumerProps, groupId);
            });
            registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
            if (this.atLeastOneFailure) {
                Runnable rescheduleTask = () -> {
                    registry.getListenerContainers().forEach(c -> {
                        this.atLeastOneFailure = false;
                        if (!c.isRunning()) {
                            System.out.println("Attempting restart of " + c.getGroupId());
                            try {
                                c.start();
                            }
                            catch (Exception e) {
                                System.out.println("Failed to start " + e.getMessage());
                                this.atLeastOneFailure = true;
                            }
                        }
                    });
                    if (!this.atLeastOneFailure) {
                        this.restartTask.cancel(false);
                    }
                };
                this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
                        Instant.now().plusSeconds(60),
                        Duration.ofSeconds(60));
            }
        };
    }

    private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
            ApplicationContext context, Map<String, Object> consumerProps, String groupId) {

        containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
        try {
            context.getBean("listener", MyListener.class);
        }
        catch (BeanCreationException e) {
            this.atLeastOneFailure = true;
        }
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public MyListener listener() {
        return new MyListener();
    }

    @Bean
    public TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }

}

class MyListener {

    @KafkaListener(topics = "so55311070")
    public void listen(String in) {
        System.out.println(in);
    }

}

Upvotes: 4

Related Questions