Reputation: 6102
I am using Confluent JDBC Sink Connector to capture all changes from the Kafka topic to the database. My message is the JSON format without any attached schema. For example:
{ "key1": "value1", "key2": 100}
Here is my configuration:
name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true
The issue that I met is: Because of the legacy system, I cannot change the message format. So my messages are the JSON object without schema information. Does the library support mapping fields? For example mapping from field A to field B under database.
Thanks
Upvotes: 0
Views: 3413
Reputation: 23
You can use a transform to add schema. This way the payload/message to the kafka can be small and add transform in the sink/source kafka connect side to add the schema alone
Upvotes: 1
Reputation: 582
There is another option of writing a consumer interceptor and attaching the schema to the to the value before it is consumed by JDBC sink connecter.
I tried it and it worked!
Upvotes: -1
Reputation: 32090
You have to have a declared schema to your data to use the JDBC Sink. This means in practice that you need to :
If you don't have that option for when the data is produced into Kafka, you can build a stream processing stage that applies the schema. You can do this with something like Kafka Streams, or with KSQL. The output of this is a Kafka topic, which is then what you use as the source for Kafka Connect. An example of doing it in KSQL would be:
-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR,
KEY2 INT)
WITH (KAFKA_TOPIC='send_1',
VALUE_FORMAT='JSON');
-- Run a continuous query populating the target topic `SEND_1_AVRO`
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO
WITH (VALUE_FORMAT='AVRO') AS
SELECT *
FROM send_1_src;
Upvotes: 1