Trần Kim Dự
Trần Kim Dự

Reputation: 6102

JDBC Sink Connector: How to map fields from the Kafka's message to the database table's column

I am using Confluent JDBC Sink Connector to capture all changes from the Kafka topic to the database. My message is the JSON format without any attached schema. For example:

{ "key1": "value1", "key2": 100}

Here is my configuration:

name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true

The issue that I met is: Because of the legacy system, I cannot change the message format. So my messages are the JSON object without schema information. Does the library support mapping fields? For example mapping from field A to field B under database.

Thanks

Upvotes: 0

Views: 3413

Answers (3)

Venkat
Venkat

Reputation: 23

You can use a transform to add schema. This way the payload/message to the kafka can be small and add transform in the sink/source kafka connect side to add the schema alone

https://github.com/RedHatInsights/connect-transforms/blob/master/src/main/java/com/redhat/insights/kafka/connect/transforms/InjectSchema.java

Upvotes: 1

Abhishek Gayakwad
Abhishek Gayakwad

Reputation: 582

There is another option of writing a consumer interceptor and attaching the schema to the to the value before it is consumed by JDBC sink connecter.

I tried it and it worked!

Upvotes: -1

Robin Moffatt
Robin Moffatt

Reputation: 32090

You have to have a declared schema to your data to use the JDBC Sink. This means in practice that you need to :

If you don't have that option for when the data is produced into Kafka, you can build a stream processing stage that applies the schema. You can do this with something like Kafka Streams, or with KSQL. The output of this is a Kafka topic, which is then what you use as the source for Kafka Connect. An example of doing it in KSQL would be:

-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR, 
                          KEY2 INT) 
  WITH (KAFKA_TOPIC='send_1', 
        VALUE_FORMAT='JSON');

-- Run a continuous query populating the target topic `SEND_1_AVRO` 
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO 
  WITH (VALUE_FORMAT='AVRO') AS 
  SELECT * 
    FROM send_1_src;

  • To learn more about KSQL see here.
  • You can find some great examples of patterns of stream processing with raw Kafka consumers vs Kafka Streams vs KSQL in Kafka Tutorials here.

Upvotes: 1

Related Questions