Ragini Gupta
Ragini Gupta

Reputation: 147

How to add multiple topics in JDBC Sink Connector configuration and get topics data in multiple target tables?

Below is my JDBC-Sink connector configuration:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
behavior.on.null.values=ignore
table.name.format=kafka_Address_V1, kafka_Attribute_V1
connection.password=***********
topics=Address,Attribute
task.max=3
batch.size=500
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=user
name=sink-jdbc-connector
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://localhost:DB;
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id

If I use this configuration I am getting single table in target database in this kafka_Address_V1, kafka_Attribute_V1 format, which is combination of these two.

enter image description here

Please let me know how can I use to store different topics data in different Tables by using JDBC-Sink Connector.

Upvotes: 1

Views: 1989

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32090

Per the docs, table.name.format takes a single value, and defaults to using the topic name itself.

Edit: courtesy of @OneCricketeer, you can also just use table.name.format=kafka_${topic}_V1. The SMT below is useful for more complex transformations of the name.

To achieve what you want you can use the RegExRouter Single Message Transform to modify the topic as its processed by Kafka Connect

Try this:

transforms                             =changeTopicName
transforms.changeTopicName.type        =org.apache.kafka.connect.transforms.RegexRouter
transforms.changeTopicName.regex       =(.*)
transforms.changeTopicName.replacement =kafka_$1_V1

Upvotes: 3

Related Questions