Hessam
Hessam

Reputation: 1415

How to skip an Avro serialization exception in KafkaStreams API?

I have a Kafka application that is written by KafkaStreams Java api. It reads data from Mysql binlog and do some stuff that is irrelevant to my question. The problem is one particular row produces error in deserialization from avro. I can dig into Avro schema file and find the problem but as a whole what I need is a forgiving exception handler that upon encountering such error does not bring the whole application to halt. This is the main part of my stream app:

StreamsBuilder streamsBuilder = watchForCourierUpdate(builder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }

    private static StreamsBuilder watchForCourierUpdate(StreamsBuilder builder){
        CourierUpdateListener courierUpdateListener = new CourierUpdateListener(builder);
        courierUpdateListener.start();
        return builder;
    }

    private static Properties configProperties(){

        Properties streamProperties = new Properties();

        streamProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Configs.getConfig("schemaRegistryUrl"));
        streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "courier_app");
        streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Configs.getConfig("bootstrapServerUrl"));
        streamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        streamProperties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/state_dir");
        streamProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
        streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
        streamProperties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                CourierSerializationException.class);

        return streamProperties;

    }

This is my CourierSerializationException class:

public class CourierSerializationException implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> producerRecord, Exception e) {
        Logger.logError("Failed to de/serialize entity from " + producerRecord.topic() + " topic.\n" + e);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

Still, whenever an avro deserialization exception happens the stream shuts down and the application does not continue. Am I missing something!

Upvotes: 0

Views: 1197

Answers (1)

Vinit Pillai
Vinit Pillai

Reputation: 516

Have you tried to do this with the default.deserialization.exception.handler provided by kafka? you can use LogAndContinueExceptionHandler which will log and continue.

I may be wrong but i think creating a Customexception by implementing ProductionExceptionHandler only works for network related error on the kafka side.

add this to the properties and see what happens:

> props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

Upvotes: 3

Related Questions