JohnD
JohnD

Reputation: 455

Graceful shutdown kafka consumer

I'm using spring-kafka 3.1.6 and hazelcast 3.7.4.

When shutting down my application, it seems like hazelcast is shutting down before my kafka consumer.

Thus leading to errors

nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!

and

org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:422)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)

My consumer is configured with a RetryTemplate.

org.springframework.context.support.DefaultLifecycleProcessor   Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]

Can someone help me to graceful shutdown by making hazelcast waiting for the kafka container to stop ?

Thanks

Edit

Both KafkaContainer and HazelcastInstance are beans handled by spring.

HazelcastInstance is created manually (not using default springboot autoconfiguration) :

@PreDestroy
public void destroy() {
    LOGGER.info("Closing Hazelcast");
    Hazelcast.shutdownAll();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    LOGGER.debug("Configuring Hazelcast");
    Config config = new Config();
    ...

And Kafka Container :

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Key, Value> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Key, Value> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    factory.getContainerProperties().setErrorHandler(new DefaultErrorHandler());
    return factory;
}

Edit 2:

I'm running the application replicated 3 times. All 3 replicas has been shutdown at the same moment.

Here is the shutdown logs history for 1 replica :

1) 18:14:54.427 "ContextClosedEvent" spring event has been catched by a component and logged

2) 18:14:54.441 org.apache.kafka.common.errors.WakeupException: null

3) 18:14:55.951 com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!

4) Repeat 3) 19 times from 18:14:55.951 to 18:15:23.103

5) 18:15:24.441 Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]

6) 18:15:24.537 Closing Hazelcast

Upvotes: 0

Views: 3701

Answers (2)

Clover
Clover

Reputation: 549

This might the wrong thread for client based setup but for embed based setup, Here that worked for me.

set this property to false,

hazelcast.shutdownhook.enabled = false

hazelcast.shutdownhook.enabled = true

Enable Hazelcast shutdownhook thread. When this is enabled, this thread terminates the Hazelcast instance without waiting to shutdown gracefully.

by default, this is to set to true. for more info

Helped me not to forcefully kill hazelcast but i had to call hazelcastInstance.shutdown() explicitly

Upvotes: 2

mdogan
mdogan

Reputation: 1949

If Kafka consumer bean is not directly depending on Hazelcast bean, then you can define an order between beans (during both initialization and destruction time), using depends-on attribute of bean definitions. Like;

<bean id="kafka-consumer" class="KafkaConsumerClassName" depends-on="hazelcast"/>
<bean id="hazelcast" class="HazelcastInstance" />

In this case, Hazelcast bean will be created & initialized before Kafka consumer bean and it will be destroyed after Kafka consumer bean.

See Using depends-on section in Spring reference manual.

Upvotes: 1

Related Questions