Reputation: 1030
I can't seem to be able to consume messages in their concrete avro implementation, I get the following exception:
class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass
Here is the code (I use Spring Boot
)
MyProducer.java
private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;
public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(MyConcreteClass myConcreteClass) {
this.kafkaTemplate.send(topic, myConcreteClass);
}
MyConsumer.java
@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
//handle
}
Note that if I change everything to GenericRecord
, the deserialization works properly, so I know all config (not pasted) is configured correctly.
Also maybe important to note that I didn't register the schema myself, and instead let my client code do it for me.
Any ideas?
EDIT:
Config:
@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new DefaultKafkaConsumerFactory<>(props);
}
Upvotes: 2
Views: 4468
Reputation: 41
I was getting blocked by the same issue as you. Thing is that the KafkaAvroDeserializer
was deserializing the message as GenericData$Record
so then spring kafka is searching the class annotated with @KafkaListener
to have @KafkaHandler
methods with a parameter of this type.
You'll need to add this property to your spring kafka configuration so the deserializer can return directly the SpecificRecord
classes that you previously need to generate with the avro plugin:
spring:
kafka:
properties:
specific.avro.reader: true
Then your consumer may be like this
@KafkaListener(...)
public void consumeCreation(MyAvroGeneratedClass specificRecord) {
log.info("Consuming record: {}", specificRecord);
}
Upvotes: 1
Reputation: 11
In addition to OneCricketeer's answer, I encountered another java.lang.ClassCastException after setting the specific avro reader config. It was nested exception is java.lang.ClassCastException: class my.package.Envelope cannot be cast to class my.package.Envelope (my.package.Envelope is in unnamed module of loader 'app'; my.package.Envelope is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @3be312bd);
It seems like spring boot devtools wrapped the class in it's reloader module causing jvm thought that's a different class.
I removed the spring boot devtools in pom and it finally worked as expected now.
<!-- Remove this from pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
Upvotes: 1
Reputation: 191953
MyConcreteClass needs to extend SpecificRecord
You can use the Avro maven plugin to generate it from a schema
Then you must configure the serializer to know you want to use specific records
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;
Upvotes: 1
Reputation: 105
You need to customize your consumer configuration. The ContentDeserializer needs to be an KafkaAvroDeserializer with a reference to your schema registry.
Upvotes: -1