Reputation: 11
I've been implementing a kafka connector for PostgreSQL (I'm using the debezium kafka connector and running all the pieces through docker). I need a custom partition key, and so I've been using the SMT to achieve this. However, the approach that I'm using creates a Struct, and I need it to be a string. This article runs through how to set up the partition key as an int, but I can't access the config file to set up the appropriate transforms. Currently my kafka connector looks like this
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname" : "postgres",
"database.server.name": "postgres",
"table.include.list": "public.table",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.table",
"transforms": "routeRecords,unwrap,createkey",
"transforms.routeRecords.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeRecords.regex": "(.*)",
"transforms.routeRecords.replacement": "table",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.createkey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createkey.fields": "id"
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
I know that I have to extract the value of the column but I'm just not sure how.
Upvotes: 1
Views: 1124
Reputation: 191701
ValueToKey
creates a Struct from a list of fields, as it is documented.
You need one more transform to extract a specific field from a Struct, as shown in the linked post.
org.apache.kafka.connect.transforms.ExtractField$Key
Note: This does not "set" the partition of the actual Kafka record, only the key, which is then hashed by the Producer to get the partition
Upvotes: 1