Reputation: 41
I have a requirement where I want to start Kakfa consumer manually.
Code :
class Dummy implements
ConsumerSeekAware
{
@Autowired
KafkaListenerEndpointRegistry registry;
CountDownLatch latch;
@Autowired
ConcurrentKafkaListenerContainerFactory factory;
onIdleEvent(){
latch.countdown()
}
@KafkaListener(id="myContainer",
topics="mytopic",
autoStartup="false")
public void listen() {}
@Scheduled(cron=" some time ")
void do_some_consumption(){
latch = new CountDownLatch(1);
this.registry.getListenerContainer("myContainer").start();
latch.await();
do processing
this.registry.getListenerContainer("myContainer").stop()
}
}
I have made the bean of ConcurrentKafkaListenerContainerFactory with all props in my other Config class which I am Autowiring here.
However, I get a null pointer exception when I start my container using this.registry.getListenerContainer("myContainer").start()
java.lang.NullPointerException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 1
Views: 2818
Reputation: 174769
I just copied your code into a Spring Boot app (which auto configures the factories); and everything works perfectly as expected...
@SpringBootApplication
@EnableScheduling
public class So62412316Application {
public static void main(String[] args) {
SpringApplication.run(So62412316Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("mytopic", "foo");
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("mytopic").partitions(1).replicas(1).build();
}
}
@Component
class Dummy implements ConsumerSeekAware {
@Autowired
KafkaListenerEndpointRegistry registry;
CountDownLatch latch;
@Autowired
ConcurrentKafkaListenerContainerFactory factory;
@EventListener
public void onIdleEvent(ListenerContainerIdleEvent event) {
System.out.println(event);
latch.countDown();
}
@KafkaListener(id = "myContainer", topics = "mytopic", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Scheduled(initialDelay = 5_000, fixedDelay = 60_000)
void do_some_consumption() throws InterruptedException {
latch = new CountDownLatch(1);
this.registry.getListenerContainer("myContainer").start();
latch.await();
this.registry.getListenerContainer("myContainer").stop();
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000
Upvotes: 2