Reputation: 317
I was following tutorial on kafka connect, and I am wondering if there is a possibility to receive message that would be a type of some class.
Tutorial: https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
Like table presented in the tutorial, schema would look like below :
{
"namespace": "avro",
"type": "record",
"name": "Audit",
"fields": [
{"name": "c1", "type": "int"},
{"name": "c2", "type": "string"},
{"name": "create_ts", "type": "long"},
{"name": "update_ts", "type": "long"}
]
}
Based on the avro format i have generated a class with maven.
Then I have defined consumer factory with my type :
public ConsumerFactory<String, Audit> auditConsumerFactory() { ... )
And KafkaListener :
@KafkaListener(topics = "${kafka.mysql.topic}", containerFactory = "mysqlKafkaListenerContainerFactory")
public void receive(Audit audit) {
System.out.println(audit);
this.latch.countDown();
}
But in the end i am getting error like this :
2019-12-16 21:56:50.139 ERROR 31862 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-audit-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class audit specified in writer's schema whilst finding reader's schema for a SpecificRecord.
EDIT ConsumerFactory with Deserializer :
public ConsumerFactory<String, Audit> auditConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getKafkaBootstrapAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory(props);
}
Audit.avsc
{
"type": "record",
"name": "avro.Audit",
"fields": [
{
"name": "c1",
"type": "int"
},
{
"name": "c2",
"type": "string"
},
{
"name": "create_ts",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
},
{
"name": "update_ts",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
}
],
"connect.name": "avro.Audit"
}
I have found answer to my question on Github
Upvotes: 1
Views: 1640
Reputation: 101
I don't figure out if there's another thread about this issue, but finally Confluence fix this problem. Adding this three lines to the JDBC connector
"transforms": "AddNamespace", "transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "transforms.AddNamespace.schema.name": "my.namespace.NameOfTheSchema",
Upvotes: 4