drstonecodez
drstonecodez

Reputation: 317

Kafka connect - JDBC custom Avro schema

I was following tutorial on kafka connect, and I am wondering if there is a possibility to receive message that would be a type of some class.

Tutorial: https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/

Like table presented in the tutorial, schema would look like below :

{
   "namespace": "avro",
   "type": "record",
   "name": "Audit",
   "fields": [
      {"name": "c1", "type": "int"},
      {"name": "c2", "type": "string"},
      {"name": "create_ts", "type": "long"},
      {"name": "update_ts", "type": "long"}
   ]
}

Based on the avro format i have generated a class with maven.

Then I have defined consumer factory with my type :

public ConsumerFactory<String, Audit> auditConsumerFactory() { ... )

And KafkaListener :

@KafkaListener(topics = "${kafka.mysql.topic}", containerFactory =   "mysqlKafkaListenerContainerFactory")
public void receive(Audit audit) {
     System.out.println(audit);
     this.latch.countDown();
}

But in the end i am getting error like this :

2019-12-16 21:56:50.139 ERROR 31862 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mysql-audit-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class audit specified in writer's schema whilst finding reader's schema for a SpecificRecord.

EDIT ConsumerFactory with Deserializer :

    public ConsumerFactory<String, Audit> auditConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getKafkaBootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return new DefaultKafkaConsumerFactory(props);
    }

Audit.avsc

{
  "type": "record",
  "name": "avro.Audit",
  "fields": [
    {
      "name": "c1",
      "type": "int"
    },
    {
      "name": "c2",
      "type": "string"
    },
    {
      "name": "create_ts",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "update_ts",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    }
  ],
  "connect.name": "avro.Audit"
}

I have found answer to my question on Github

Upvotes: 1

Views: 1640

Answers (1)

bpdin
bpdin

Reputation: 101

I don't figure out if there's another thread about this issue, but finally Confluence fix this problem. Adding this three lines to the JDBC connector

"transforms": "AddNamespace", "transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "transforms.AddNamespace.schema.name": "my.namespace.NameOfTheSchema",

KAFKA-7883

Upvotes: 4

Related Questions