Reputation: 39810
I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
What configuration is needed in the sink file in order to extract all the (sub)fields under data
and headers
and ignore those under beforeData
so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:
USER_ID, USER_CATEGORY, operation, timestamp
I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.
Upvotes: 10
Views: 15161
Reputation: 318
If you're willing to list specific field names, you can solve this by:
rename
to make the field names be what you want the sink to emitwhitelist
to limit the emitted fields to those you selectFor your case it might look like:
"transforms": "t1,t2,t3",
"transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
"transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
Upvotes: 10
Reputation: 191725
I think you want ExtractField
, and unfortunately, it's a Map.get
operation, so that means 1) nested fields cannot be gotten in one pass 2) multiple fields need multiple transforms.
That being said, you might to attempt this (untested)
transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers
If that doesn't work, you might be better off implementing your own Transformations package that can at least drop values from the Struct / Map.
Upvotes: 9