Reputation: 455
I'm using spring-kafka 3.1.6 and hazelcast 3.7.4.
When shutting down my application, it seems like hazelcast is shutting down before my kafka consumer.
Thus leading to errors
nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!
and
org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:422)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
My consumer is configured with a RetryTemplate.
org.springframework.context.support.DefaultLifecycleProcessor Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]
Can someone help me to graceful shutdown by making hazelcast waiting for the kafka container to stop ?
Thanks
Edit
Both KafkaContainer and HazelcastInstance are beans handled by spring.
HazelcastInstance is created manually (not using default springboot autoconfiguration) :
@PreDestroy
public void destroy() {
LOGGER.info("Closing Hazelcast");
Hazelcast.shutdownAll();
}
@Bean
public HazelcastInstance hazelcastInstance() {
LOGGER.debug("Configuring Hazelcast");
Config config = new Config();
...
And Kafka Container :
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Key, Value> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Key, Value> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
factory.getContainerProperties().setErrorHandler(new DefaultErrorHandler());
return factory;
}
Edit 2:
I'm running the application replicated 3 times. All 3 replicas has been shutdown at the same moment.
Here is the shutdown logs history for 1 replica :
1) 18:14:54.427 "ContextClosedEvent" spring event has been catched by a component and logged
2) 18:14:54.441 org.apache.kafka.common.errors.WakeupException: null
3) 18:14:55.951 com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!
4) Repeat 3) 19 times from 18:14:55.951 to 18:15:23.103
5) 18:15:24.441 Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]
6) 18:15:24.537 Closing Hazelcast
Upvotes: 0
Views: 3701
Reputation: 549
This might the wrong thread for client based setup but for embed based setup, Here that worked for me.
set this property to false,
hazelcast.shutdownhook.enabled = false
hazelcast.shutdownhook.enabled = true
Enable Hazelcast shutdownhook thread. When this is enabled, this thread terminates the Hazelcast instance without waiting to shutdown gracefully.
by default, this is to set to true. for more info
Helped me not to forcefully kill hazelcast but i had to call hazelcastInstance.shutdown()
explicitly
Upvotes: 2
Reputation: 1949
If Kafka consumer bean is not directly depending on Hazelcast bean, then you can define an order between beans (during both initialization and destruction time), using depends-on
attribute of bean definitions. Like;
<bean id="kafka-consumer" class="KafkaConsumerClassName" depends-on="hazelcast"/>
<bean id="hazelcast" class="HazelcastInstance" />
In this case, Hazelcast bean will be created & initialized before Kafka consumer bean and it will be destroyed after Kafka consumer bean.
See Using depends-on section in Spring reference manual.
Upvotes: 1