Nikhil K
Nikhil K

Reputation: 45

Can Kafka Streams consume message in a format and produce another format such as AVRO message

I am using kafka streams to consume JSON string from one topic, process and generate response to be stored in another topic. However the message that needs to be produced to the response topic needs to be in avro format.

I have tried using key as string serde and value as SpecificAvroSerde

Following is my Code to create Topology:

StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic());

Following is my config

    if (schemaRegistry != null && schemaRegistry.length > 0) {
        streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));          
    }
    streamsConfig.put(this.keySerializerKeyName, StringSerde.class);
    streamsConfig.put(this.valueSerialzerKeyName, SpecificAvroSerde.class);
    streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
    streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
    streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
    streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.parseInt(commitIntervalMs));
    streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfThreads);
    streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
    streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
    streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
    streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
    streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

I am seeing the following error when I try with the example:

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Upvotes: 0

Views: 696

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20860

Problem is with the Key Value Serdes. You should use the correct serdes while consuming the stream and same for while publishing the stream.

In case if your input is JSON and you want to publish as Avro, you can do it as following:

Properties streamsConfig= new Properties();
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic(),Consumed.with(Serdes.String(), Serdes.String()));

// Replace AvroObjectClass with your avro object type
KStream<String,AvroObjectClass> consumerAvroStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerAvroStream.to(kafkaConfiguration.getProducerTopic());

Upvotes: 1

Related Questions