Reputation: 45
I'm reading data from a mysql database table using a Kafka Source JDBC connector
and publishing it to the topic test-mysql-petai
.
The database table has 2 fields where Id
is the primary key:
+---------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(20) | YES | | NULL | |
+---------+-------------+------+-----+---------+----------------+
I need the value of the id
field to be the Key of the topic. I tried adding a transformation to the jdbc connector properties.
JDBCConnector.properties:
name=jdbc-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/test?user=dins&password=pw&serverTimezone=UTC
table.whitelist=petai
mode=incrementing
incrementing.column.name=id
schema.pattern=""
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
topic.prefix=test-mysql-jdbc-
But, when I read the keys and values using a consumer, I get following:
Key = {"schema":{"type":"int32","optional":false},"payload":61}
Value ={"id":61,"name":"ttt"}
I need to get the following:
Key = 61
Value ={"id":61,"name":"ttt"}
What am I doing wrong? Any help is appreciated.
Thank you.
Upvotes: 4
Views: 4598
Reputation: 4375
If you don't want to include a schema to keys, you can tell Kafka Connect about it by setting key.converter.schemas.enable=false
.
For a detailed explanation, please see Kafka Connect Deep Dive – Converters and Serialization Explained by Robin Moffatt.
Upvotes: 5