Reputation: 147
Below is my JDBC-Sink connector configuration:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
behavior.on.null.values=ignore
table.name.format=kafka_Address_V1, kafka_Attribute_V1
connection.password=***********
topics=Address,Attribute
task.max=3
batch.size=500
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=user
name=sink-jdbc-connector
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://localhost:DB;
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id
If I use this configuration I am getting single table in target database in this kafka_Address_V1, kafka_Attribute_V1 format, which is combination of these two.
Please let me know how can I use to store different topics data in different Tables by using JDBC-Sink Connector.
Upvotes: 1
Views: 1989
Reputation: 32090
Per the docs, table.name.format
takes a single value, and defaults to using the topic name itself.
Edit: courtesy of @OneCricketeer, you can also just use table.name.format=kafka_${topic}_V1
. The SMT below is useful for more complex transformations of the name.
To achieve what you want you can use the RegExRouter
Single Message Transform to modify the topic as its processed by Kafka Connect
Try this:
transforms =changeTopicName
transforms.changeTopicName.type =org.apache.kafka.connect.transforms.RegexRouter
transforms.changeTopicName.regex =(.*)
transforms.changeTopicName.replacement =kafka_$1_V1
Upvotes: 3