Anu Pabla
Anu Pabla

Reputation: 31

Kafka Connect Debezium with ms sql server. Key extract configuration problem

I am working with Debezium and kafka connect. I need to extract "listid" from the payload below and be able to assign it to the kafka message key. Using the configs in my connector file I am unable to extract the value. Appreciate any assistance with solving this.

{
    "before": null,
    "after": {
        "listid": 19,
        "billingid": "0",
        "userid": "test",

Connector properties

key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false


value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid


key.converter.schemas.enable=false
value.converter.schemas.enable=true

Sooooo I am continuing to experiment and wanted to share a better explanation of the problem and what I have experimented with.

Following along with Robin's blog https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

1- running kafka-avro consumer from the terminal

KEY

{"LIST_ID":10058}

VALUE

{"before":null,"after":{"test.dbo.test.Value":{"LIST_ID":10058,"billingid 
etc...

2- I am trying to get the key to be the value 10058

3 - This is what i have for my transform configuration

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID

I am using debezium connector in my pipeline against mssql server.

Upvotes: 0

Views: 932

Answers (1)

Iskuskov Alexander
Iskuskov Alexander

Reputation: 4375

Looks like a typo (extract vs extractkey):

transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid

Upvotes: 1

Related Questions