amaroqz
amaroqz

Reputation: 73

Kafka Consumer: Stop processing messages when exception was raised

I'm a bit confused about the poll() behaviour of (Spring) Kafka after/when stopping the ConcurrentMessageListenerContainer.

What I want to achieve: Stop the consumer after an exception was raised (for example message could not be saved to the database), do not commit offset, restart it after a given time and start processing again from the previously failed message.

I read this article which says that the container will call the listener with the remaining records from the poll (https://github.com/spring-projects/spring-kafka/issues/451) which means that there is no guarantee that after the failed message a further message which was processed successfully will commit the offset. This could end up in lost/skipped messages.

Is this really the case and if yes is there a solution to solve this without upgrading the newer versions? (DLQ is not a solution for my case)

What I already did: Setting the setErrorHandler() and setAckOnError(false)

private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps,  Class keyDeserializer) {
    Map<String, Object> props = new HashMap<>();
    //Set common props
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);

    if (kafkaProps.isSslEnabled()) {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
        props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
        props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
    }

    return props;
}

Consumer

public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
    StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();

    ContainerProperties containerProperties = new ContainerProperties(...);
    containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    containerProperties.setAckOnError(false);
    containerProperties.setErrorHandler(stoppingErrorHandler);

    ConcurrentMessageListenerContainer<String, byte[]> container = ...
    container.setConcurrency(1); //use only one container
    stoppingErrorHandler.setConcurrentMessageListenerContainer(container);

    return container;
}

Error Handler

public class StoppingErrorHandler implements ErrorHandler {

    @Setter
    private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;

    @Value("${backends.kafka.consumer.halt.timeout}")
    int consumerHaltTimeout;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        if (concurrentMessageListenerContainer != null) {
            concurrentMessageListenerContainer.stop();
        }

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
                    concurrentMessageListenerContainer.start();
                }
            }
        }, consumerHaltTimeout);
    }
}

What I'm using:

  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-kafka</artifactId>
  <version>2.1.2.RELEASE</version>

  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.1.7.RELEASE</version>

Upvotes: 6

Views: 8358

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

without upgrading the newer versions?

2.1 introduced the ContainerStoppingErrorHandler which is a ContainerAwareErrorHandler, the remaining unconsumed messages are discarded (and will be re-fetched when the container is restarted).

With earlier versions, your listener will need to reject (fail) the remaining messages in the batch (or set max.records.per.poll=1).

Upvotes: 2

Related Questions