Reputation: 101
I was following tutorial on kafka connect, and I am wondering if there is a possibility to define a custom schema registry for a topic which data came from a MySql table.
I can't find where define it in my json/connect config and I don't want to create a new version of that schema after creating it.
My MySql table called stations has this schema
Field | Type
---------------+-------------
code | varchar(4)
date_measuring | timestamp
attributes | varchar(256)
where the attributes contains a Json data and not a String (I have to use that type because the Json field of the attributes are variable.
My connector is
{
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "The Kafka topic will be made up of this prefix, plus the table name ",
"key.converter.schema.registry.url": "http://localhost:8081",
"name": "jdbc_source_mysql_stations",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": [
"ValueToKey"
],
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": [
"code",
"date_measuring"
],
"connection.url": "jdbc:mysql://localhost:3306/db_name?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC",
"connection.user": "confluent",
"connection.password": "**************",
"table.whitelist": [
"stations"
],
"mode": "timestamp",
"timestamp.column.name": [
"date_measuring"
],
"validate.non.null": "false",
"topic.prefix": "mysql-"
}
and creates this schema
{
"subject": "mysql-stations-value",
"version": 1,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"stations\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"date_measuring\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"attributes\",\"type\":\"string\"}],\"connect.name\":\"stations\"}"
}
Where "attributes" field is of course a String. Unlike I would apply it this other schema.
{
"fields": [
{
"name": "code",
"type": "string"
},
{
"name": "date_measuring",
"type": {
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
},
{
"name": "attributes",
"type": {
"type": "record",
"name": "AttributesRecord",
"fields": [
{
"name": "H1",
"type": "long",
"default": 0
},
{
"name": "H2",
"type": "long",
"default": 0
},
{
"name": "H3",
"type": "long",
"default": 0
},
{
"name": "H",
"type": "long",
"default": 0
},
{
"name": "Q",
"type": "long",
"default": 0
},
{
"name": "P1",
"type": "long",
"default": 0
},
{
"name": "P2",
"type": "long",
"default": 0
},
{
"name": "P3",
"type": "long",
"default": 0
},
{
"name": "P",
"type": "long",
"default": 0
},
{
"name": "T",
"type": "long",
"default": 0
},
{
"name": "Hr",
"type": "long",
"default": 0
},
{
"name": "pH",
"type": "long",
"default": 0
},
{
"name": "RX",
"type": "long",
"default": 0
},
{
"name": "Ta",
"type": "long",
"default": 0
},
{
"name": "C",
"type": "long",
"default": 0
},
{
"name": "OD",
"type": "long",
"default": 0
},
{
"name": "TU",
"type": "long",
"default": 0
},
{
"name": "MO",
"type": "long",
"default": 0
},
{
"name": "AM",
"type": "long",
"default": 0
},
{
"name": "N03",
"type": "long",
"default": 0
},
{
"name": "P04",
"type": "long",
"default": 0
},
{
"name": "SS",
"type": "long",
"default": 0
},
{
"name": "PT",
"type": "long",
"default": 0
}
]
}
}
],
"name": "stations",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
Any suggestion please? In case it's not possible, I suppose I'll have to create a KafkaStream to create another topic, even if I would avoid it.
Thanks in advance!
Upvotes: 0
Views: 932
Reputation: 191671
I don't think you're asking anything about using a "custom" registry (which you'd do with the two lines that say which registry you're using), but rather how you can parse the data / apply a schema after the record is pulled from the database
You can write your own Transform, or you can use Kstreams, which are really the main options here. There is a SetSchemaMetadata transform, but I'm not sure that'll do what you want (parse a string into an Avro record)
Or if you must shove JSON data into a single database attribute, maybe you shouldn't use Mysql and rather a document database which has more flexible data constraints.
Otherwise, you can use BLOB rather than varchar and put binary Avro data into that column, but then you'd still need a custom deserializer to read the data
Upvotes: 1