Reputation: 13
This is related to the topic mentioned in the below thread
JDBC Sink Connector -upserting into multiple tables from multiples topics using kafka-connect
I know its a bit older post. But my question is also around the same topic. The difference is I want to remove the suffix from the topic name and pass the remaining string as the table name in the sink connector. How can I achieve that?
If it is using SMT, can you please help to achieve that.
Topic Name: Source1-Emp,Source1-Company
Table Name: Emp, Company
Data load needs to happen from multiple topics to multiple tables using a single sink connector.
Can you please help me to implement that.
MUS
Upvotes: 1
Views: 3631
Reputation: 32140
You're right in your assumption that this can be done with Single Message Transform (SMT). Here's an example:
curl -X PUT http://localhost:8083/connectors/sink-postgres-00/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"tasks.max": "1",
"topics": "Source1-Emp,Source1-Company",
"auto.create": "true",
"auto.evolve":"true",
"transforms":"dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"Source1-(.*)$",
"transforms.dropPrefix.replacement":"$1"
}'
Upvotes: 4