Coffeelenko
Coffeelenko

Reputation: 1

Handling message reading errors in Reactive Kafka

How to handle errors in reactive kafka? Is this possible only by configuring kafka?

If an error is thrown when reading the message, the consumer stops. If there is no confirmation that the message has been read, the consumer will also stop.

My solution is to use reactive retry, but is it the right thing to do? If there are a lot of messages, then all error messages will hang in the retry block, and if the application restarts, then all messages will start to be read at the same time, which will give a heavy load.

@Slf4j
public class AppConsumer<K, T> {
    public static Duration[] INTERVALS = {
            Duration.ofMinutes(1),
            Duration.ofMinutes(2),
            Duration.ofMinutes(20),
            Duration.ofHours(1),
            Duration.ofHours(6),
            Duration.ofHours(12)
    };
    public static Duration LAST_INTERVAL = INTERVALS[INTERVALS.length - 1];

    public static Retry RETRY = Retry.from(companion ->
            companion
                    .flatMap(signal -> {
                        Duration interval = signal.totalRetries() < INTERVALS.length
                                ? INTERVALS[(int) signal.totalRetries()]
                                : LAST_INTERVAL;

                        log.error("Retry #{}, delay: {} s", (signal.totalRetries() + 1), interval.getSeconds());

                        return Mono.just(interval)
                                .delayElement(interval)
                                .map(d -> signal);
                    }));

    public <S> AppConsumer(String kfService, String topic, Class<S> keyDeser, MessageProcessor<K, T> processor) {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kfService);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeser);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        ReceiverOptions<K, T> options = ReceiverOptions.<K, T>create(consumerProps)
                .subscription(Collections.singleton(topic));

        KafkaReceiver.create(options)
                .receive()
                .publishOn(Schedulers.boundedElastic())
                .flatMap(i -> processor
                        .process(i)
                        .retryWhen(RETRY))
                .subscribe();
    }
}

The entire code can be viewed on github

Perhaps I have a wrong understanding of how to use kafka and there are simple ways to solve this problem, please correct me where I am wrong and what is best to use.

The ideal solution for me is when the consumer receives an error, he reads this message over and over again with an exponential delay (holding the delayed time even after restarting the application) until it is processed correctly. No message should be missed.

Upvotes: 0

Views: 42

Answers (0)

Related Questions