Reputation: 603
I have a spring boot (v.1.57) application which uses Spring Cloud Stream (v1.3.0) and Kafka (v1.1.6). I want to be able to gracefully shut it down, i.e., when shutting down, all stream listeners (i.e., annotated with @StreamListener) should:
I noticed that there's a property called 'shutdownTimeout' in ContainerProperties (which is set to a default of 10000ms) so I've tried to modify it to 30000 by extending ConcurrentKafkaListenerContainerFactoryConfigurer class (Since it has a @ConditionalOnMissingBean annotation) via reflection like so:
@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {
@Autowired
private KafkaProperties kproperties;
@Override
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
myAccessor.setPropertyValue("properties", kproperties);
ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
super.configure(listenerContainerFactory, consumerFactory);
containerProperties.setShutdownTimeout(30000);
}
}
But it wasn't successful. Also tried putting it (shutdownTimeout: 30000) in application.yml under the spring cloud stream binder settings, but again it didn't help.
Is there any way to control the shutdown process and achieve my goals?
Upvotes: 4
Views: 2522
Reputation: 174504
EDIT
It is no longer necessary to do this reflection hack; just add a ListenerContainerCustomizer
@Bean
to the application context. See here.
EDIT_END
spring-kafka 1.1.x is no longer supported; you should be using 1.3.9 with boot 1.5.x.
The current Boot 1.5.x version is 1.5.21.
You should upgrade immediately.
However, there a much newer versions of all of these projects.
Spring Cloud Stream doesn't use that factory, or the boot properties, to create its containers; it doesn't expose a mechanism to configure that property on the container.
Spring Cloud Stream 2.1 added the ListenerContainerCustomizer
which allows you to customize the binding container by setting any properties on it.
I suggest you upgrade to Boot 2.1.6 and Spring Cloud Stream Germantown (2.2.0).
EDIT
This is a bit of a hack, but it should work until you can upgrade to a newer stream release...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {
public static void main(String[] args) {
SpringApplication.run(So56883620Application.class, args).close();
}
private final CountDownLatch latch = new CountDownLatch(1);
@StreamListener(Sink.INPUT)
public void listen(String in) throws InterruptedException {
this.latch.countDown();
System.out.println(in);
Thread.sleep(6_000);
System.out.println("exiting");
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
// wait for listener to start
this.latch.await(10, TimeUnit.SECONDS);
System.out.println("Shutting down");
};
}
@Bean
public SmartLifecycle bindingFixer(BindingService bindingService) {
return new SmartLifecycle() {
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public void stop() {
// no op
}
@Override
public void start() {
@SuppressWarnings("unchecked")
Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
.getPropertyValue("consumerBindings");
@SuppressWarnings("unchecked")
Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
.getPropertyValue("lifecycle.messageListenerContainer"))
.getContainerProperties().setShutdownTimeout(30_000L);
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
@Override
public boolean isAutoStartup() {
return true;
}
};
}
}
Upvotes: 2