Irene Smith
Irene Smith

Reputation: 11

SMT's to create kafka connector string partition key through connector config

I've been implementing a kafka connector for PostgreSQL (I'm using the debezium kafka connector and running all the pieces through docker). I need a custom partition key, and so I've been using the SMT to achieve this. However, the approach that I'm using creates a Struct, and I need it to be a string. This article runs through how to set up the partition key as an int, but I can't access the config file to set up the appropriate transforms. Currently my kafka connector looks like this

 curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
    "name": "connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "postgres",
        "database.server.name": "postgres",
        "table.include.list": "public.table",
        "database.history.kafka.bootstrap.servers": "kafka:9092",  
        "database.history.kafka.topic": "schema-changes.table",
        "transforms": "routeRecords,unwrap,createkey",
        "transforms.routeRecords.type":  "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.routeRecords.regex": "(.*)",
        "transforms.routeRecords.replacement": "table",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.createkey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createkey.fields": "id"
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}'

I know that I have to extract the value of the column but I'm just not sure how.

Upvotes: 1

Views: 1124

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191701

ValueToKey creates a Struct from a list of fields, as it is documented.

You need one more transform to extract a specific field from a Struct, as shown in the linked post.

org.apache.kafka.connect.transforms.ExtractField$Key

Note: This does not "set" the partition of the actual Kafka record, only the key, which is then hashed by the Producer to get the partition

Upvotes: 1

Related Questions