Reputation: 303
I need to create a stream from a topic that is consumed with KafkaAvroDeserializer and not the standard kafka deserializers. This is because further down the line it will be sent out to a topic that is used in the confluent JDBC Sink connector (which does not support standard serializer/deserializers). When creating the topic I used the KafkaAvroSerializer for both key and value.
My original code (before I changed to use the Kafka Avro Serializers for the key) was:
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));
NOTE: Serdes.string above will not deserialize correctly since the key was serialized using the KafkaAvroSerializer. So, maybe there is a another form of code that will allow me to build a stream without having to set the key serdes (so it defaults to what is in the config) and I can just set the value serde (uploadSerde)?
If not, can someone tell me how I change the "Serdes.String()" standard deserizlaizer to a KafkaAvroDeserializer? e.g.
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(<What can I insert here for the KafkaAvroDeserializer.String???>, uploadSerde));
In my consumer, I am setting the correct default deserializers:
streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
If use the form (and allow the defaults as specified in my consumer which are KafkaAvro):
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC);
I get the following:
2018-04-08 00:24:53,433] ERROR [fc-demo-client-StreamThread-1] stream-thread [fc-demo-client-StreamThread-1] Failed to process stream task 0_0 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedTasks)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
....
I am using Java generated classes from avsc files and the uploadSerde is initialised with the generated Java class from the avro schema.
Thanks.
Upvotes: 4
Views: 5073
Reputation: 62330
The logic is the same for key and value. Thus, you can handle both the same way.
Your confusion is about setting consumer deserializers in the config. Note, that those configs are ignored (for internal reasons). You cannot directly configure the deserializers of the consumer. You always need to use Serdes. Thus, if you want to set default deserializer for the consumer, you need so specify the default Serde in the config.
So I create a wrapper around the KafkaAvroSerializer and KafkaAvroDeserializer, that instantiates these and then use the wrapper for the key parameter in
Consumed.with
Exactly. Or you can also set this Serde as default in the config.
Would have thought creating a stream from a topic with a KafkaAvroSerialize'd String key was a common use case
Not sure about this. If it's a plain String, I assume that people may use StringDeserializer
directly instead of wrapping the String as Avro (not sure). Also note, that it's recommended to use a schema registry when dealing with Avro. Confluent's schema registry ships with correspoinding Avro Serdes: https://github.com/confluentinc/schema-registry/ (Disclaimer: I am an employee at Confluent.)
Upvotes: 4