VinothNair
VinothNair

Reputation: 644

Reactor Kafka - At-Least-Once - handling failures and offsets in multi partition

Below is the consumer code to receive messages from kafka topic (8 partition) and processing it.

    @Component
    public class MessageConsumer {

        private static final String TOPIC = "mytopic.t";
        private static final String GROUP_ID = "mygroup";
        private final ReceiverOptions consumerSettings;
        private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);

        @Autowired
        public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
        {
            this.consumerSettings=consumerSettings;
            consumerMessage();
        }

        private void consumerMessage()
        {

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

        Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

        Flux.defer(receiver::receive)
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        partitionFlux.publishOn(scheduler)
                                .concatMap(m -> {
                                    LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
                                    return process(m.key(), m.value())
                                            .thenEmpty(m.receiverOffset().commit());
                                }))
                .retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
                .doOnError(err -> {
                    handleError(err);
                }).retry()
                .doOnCancel(() -> close()).subscribe();

    }

    private void close() {
    }

    private void handleError(Throwable err) {
        LOG.error("kafka stream error : ",err);
    }

    private Mono<Void> process(String key, String value)
    {
        if(key.equals("error"))
            return Mono.error(new Exception("process error : "));

        LOG.error("message consumed : "+key);
        return Mono.empty();
    }


    public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
        return consumerSettings
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .subscription(topics);
    }


}
    @Bean(name="consumerSettings")
    public ReceiverOptions<String, String> getConsumerSettings() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put("max.block.ms", "3000");
        props.put("request.timeout.ms", "3000");

        return ReceiverOptions.create(props);
    }

On receiving each message, my processing logic returns on empty mono if the consumed message processed successfully.

Everything works as expected if there is no error returned in the processing logic.

But if i throw an error to simulate the exception behaviour in my processing logic for a particular message then i am missing to process that message which caused the exception. The stream moves to the next message.

What i want to achieve is, process the current message and commit the offset if its successful then move to the next record.

If any exception in processing the message don't commit the current offset and retry the same message until its successful. Don't move to the next message until the current message is successful.

Please let me know how to handle process failures without skipping the message and make the stream start from the offset where the exception is thrown.

Regards,

Vinoth

Upvotes: 3

Views: 3472

Answers (2)

VinothNair
VinothNair

Reputation: 644

The below code works for me. The idea is to retry the failed messages configured number of time and if its still fails then move it to failed queue and commit the message. At the same time process the messages from other partitions concurrently.

If a message from a particular partition fails configured number of time then restart the stream after a delay so that we can handle dependency failures by not hitting them continuously.

@Autowired
public ReactiveMessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings,MessageProducer producer)
{
    this.consumerSettings=consumerSettings;
    this.fraudCheckService=fraudCheckService;
    this.producer=producer;
    consumerMessage();
}

private void consumerMessage() {

    int numRetries=3;

    Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

    KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

    Flux<GroupedFlux<TopicPartition, ReceiverRecord<String, String>>> f = Flux.defer(receiver::receive)
            .groupBy(m -> m.receiverOffset().topicPartition());

    Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
            Flux.just(b)
                    .concatMap(a -> {
                        LOG.error("processing message - order: {} offset: {} partition: {}",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition());

                        return process(a.key(), a.value()).
                                then(a.receiverOffset().commit())
                                .doOnSuccess(d -> LOG.info("committing  order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()))
                                .doOnError(d -> LOG.info("committing offset failed for order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()));
                    })
                    .retryWhen(companion -> companion
                            .doOnNext(s -> LOG.info(" --> Exception processing message for order {}: offset: {} partition: {} message: {} " , b.key() , b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition(),s.getMessage()))
                            .zipWith(Flux.range(1, numRetries), (error, index) -> {
                                if (index < numRetries) {
                                    LOG.info(" --> Retying {} order: {} offset: {} partition: {} ", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    return index;
                                } else {
                                    LOG.info(" --> Retries Exhausted: {} - order: {} offset: {} partition: {}. Message moved to error queue. Commit and proceed to next", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    producer.sendMessages(ERROR_TOPIC,b.key(),b.value());
                                    b.receiverOffset().commit();
                                    //return index;
                                    throw Exceptions.propagate(error);
                                }
                            })
                            .flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 3)))
                            .doOnNext(s -> LOG.info(" --> Retried at: {} ", LocalTime.now()))
                    ))
    );

    f1.doOnError(a ->  {
                LOG.info("Moving to next message because of : ", a);
                try {

                    Thread.sleep(5000); // configurable
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

    ).retry().subscribe(); 

}

public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
    return consumerSettings
            .commitInterval(Duration.ZERO)
            .commitBatchSize(0)
            .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .subscription(topics);
}

private Mono<Void> process(OrderId orderId, TraceId traceId)
{
    try {

        Thread.sleep(500); // simulate slow response
    } catch (InterruptedException e) {
        // Causes the restart
        e.printStackTrace();
    }

   if(orderId.getId().startsWith("error")) // simulate error scenario
        return Mono.error(new Exception("processing message failed for order: " + orderId.getId()));

    return Mono.empty();
}

Upvotes: 3

JR ibkr
JR ibkr

Reputation: 919

Create different consumer groups.

Each consumer group would be related to one database.

Create your consumer so that they only process relevant event and push it to related database. If database is down then configure consumer to retry infinite amount of time. For any reason, if your consumer dies then make sure that they start from where earlier consumer left. There is small possibility that your consumer dies right after committing data to database and sending ack to kafka broker. You need to update consumer code to make sure that you process messages exactly-once (if needed).

Upvotes: 0

Related Questions