jpt
jpt

Reputation: 21

app is alive but stops consuming messages after a while

spring-kafka consumer stops consuming messages after a while. The stoppage happens every time, but never at the same duration. When app is no longer consuming, in the end of the log always I see the statement that consumer sent LEAVE_GROUP signal. If I am not seeing any errors or exceptions, why is the consumer leaving the group?

org.springframework.boot:spring-boot-starter-parent:2.0.4.RELEASE
spring-kafka:2.1.8.RELEASE
org.apache.kafka:kafka-clients:1.0.2

I've set logging as logging.level.org.apache.kafka=DEBUG logging.level.org.springframework.kafka=INFO

other settings

spring.kafka.listener.concurrency=5
spring.kafka.listener.type=single
spring.kafka.listener.ack-mode=record
spring.kafka.listener.poll-timeout=10000
spring.kafka.consumer.heartbeat-interval=5000
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.fetch-max-wait=10000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.security.protocol=SSL
spring.kafka.consumer.retry.maxAttempts=3
spring.kafka.consumer.retry.backoffperiod.millisecs=2000

ContainerFactory setup

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> recordsKafkaListenerContainerFactory(RetryTemplate retryTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(listenerCount);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        factory.getContainerProperties().setPollTimeout(pollTimeoutMillis);
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        factory.getContainerProperties().setAckOnError(false);
        factory.setRetryTemplate(retryTemplate);
        factory.setStatefulRetry(true);
        factory.getContainerProperties().setIdleEventInterval(60000L);


        return factory;
    }

Listener configuration

@Component
public class RecordsEventListener implements ConsumerSeekAware {

    private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(RecordsEventListener.class);



    @Value("${mode.replay:false}")
    public void setModeReplay(boolean enabled) {

        this.isReplay = enabled;
    }

    @KafkaListener(topics = "${event.topic}", containerFactory = "RecordsKafkaListenerContainerFactory")
    public void handleEvent(@Payload String payload) throws RecordsEventListenerException {

        try {
           //business logic
        } catch (Exception e) {

            LOG.error("Process error for event: {}",payload,e);

            if(isRetryableException(e)) {
                LOG.warn("Retryable exception detected. Going to retry.");
                throw new RecordsEventListenerException(e);
            }else{
                LOG.warn("Dropping event because non retryable exception");
            }
        }

    }

    private Boolean isRetryableException(Exception e) {

        return binaryExceptionClassifier.classify(e);
    }





    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        //do nothing
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {

        //do this only once per start of app
        if (isReplay && !partitonSeekToBeginningDone) {
            assignments.forEach((t, p) -> callback.seekToBeginning(t.topic(), t.partition()));
            partitonSeekToBeginningDone = true;
        }

    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        //do nothing
        LOG.info("Container is IDLE; no messages to pull.");
        assignments.forEach((t,p)->LOG.info("Topic:{}, Partition:{}, Offset:{}",t.topic(),t.partition(),p));
    }

    boolean isPartitionSeekToBeginningDone() {

        return partitonSeekToBeginningDone;
    }

    void setPartitonSeekToBeginningDone(boolean partitonSeekToBeginningDone) {

        this.partitonSeekToBeginningDone = partitonSeekToBeginningDone;
    }
}

When app is no longer consuming, in the end of the log always I see the statement that consumer sent LEAVE_GROUP signal.

2019-05-02 18:31:05.770 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=app] Sending Heartbeat request to coordinator x.x.x.com:9093 (id: 2147482638 rack: null)
2019-05-02 18:31:05.770 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=app] Using older server API v0 to send HEARTBEAT {group_id=app,generation_id=6,member_id=consumer-1-98d28e69-b0b9-4c2b-82cd-731e53b74b87} with correlation id 5347 to node 2147482638
2019-05-02 18:31:05.872 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=app] Received successful Heartbeat response
2019-05-02 18:31:10.856 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=app] Sending Heartbeat request to coordinator x.x.x.com:9093 (id: 2147482638 rack: null)
2019-05-02 18:31:10.857 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=app] Using older server API v0 to send HEARTBEAT {group_id=app,generation_id=6,member_id=consumer-1-98d28e69-b0b9-4c2b-82cd-731e53b74b87} with correlation id 5348 to node 2147482638
2019-05-02 18:31:10.958 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=app] Received successful Heartbeat response
2019-05-02 18:31:11.767 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=app] Sending LeaveGroup request to coordinator x.x.x.com:9093 (id: 2147482638 rack: null)
2019-05-02 18:31:11.767 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=app] Using older server API v0 to send LEAVE_GROUP {group_id=app,member_id=consumer-1-98d28e69-b0b9-4c2b-82cd-731e53b74b87} with correlation id 5349 to node 2147482638
2019-05-02 18:31:11.768 DEBUG 9548 --- [kafka-coordinator-heartbeat-thread | app] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=app] Disabling heartbeat thread

Full log

Upvotes: 1

Views: 2321

Answers (1)

jpt
jpt

Reputation: 21

Thanks to all who replied. Turns out, it was indeed the broker dropping the consumer on session timeout. The broker a very old version (0.10.0.1) did not accommodate the newer features as outlined in KIP-62 that spring-kafka version we used could make use of. Since we could not dictate the upgrade to the broker or changes to session timeout, we simply modified our processing logic so as to finish the task under the session timeout.

Upvotes: 1

Related Questions