romaneso
romaneso

Reputation: 1122

Transform Kafka Connect JDBC Source Connector output to custom format

I've implemented a Kafka Connect JDBC Source connector that is connected to an Oracle-Database and is writing data to a Kafka-Topic. Currently, I've set the option value.converter=org.apache.kafka.connect.json.JsonConverter with value.converter.schemas.enable=false being set. This option makes it possible to write JSON data to the Kafka topic (which works fine, by the way), but doesn't include the option to modify the data before sending it to the Kafka Broker.

My question now is: Is there a way to modify the data that is being sent to the Kafka Topic? In my case, the Source Connector runs a custom query and writes this directly to the Kafka topic. Anyhow, I want to extend this JSON with some custom columns and nesting. Is there a way to do so?

Upvotes: 1

Views: 3443

Answers (2)

Vassilis
Vassilis

Reputation: 1054

An example configuration to do some SMTs like rename fields, remove fields, or add fields.

Process:

DB table -connector infers schema of fields-> input Connect fields (internal connect data-structure/ connectRecord(s)) -> SMT1 -> SMT2 -> ... -> last SMT -> JsonConverter -> output json message.

DB Table:

current_name1 | current_name2 | FieldToDrop
bla1            bla2            bla3

input Connect fields inferred:

  "current_name1" = "bla1" // this is a connect record
  "current_name2" = "bla2" // this is a connect record
  "FieldToDrop" =  "bla3" // this is a connect record

output json for value:

{
  "new_name1": "bla1",
  "new_name2": "bla2",
  "type": "MyCustomType"
}

Connector configuration:

name=example-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter 
... 

transforms=RenameFields,InsertFieldType,DropFields
    
    
transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.RenameFields.renames=current_name1:new_name1,current_name2:new_name2
    
    transforms.InsertFieldType.type=org.apache.kafka.connect.transforms.InsertField$Value
    transforms.InsertFieldType.static.field=type
    transforms.InsertFieldType.static.value=MyCustomType
    
    transforms.DropFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value 
    transforms.DropFields.blacklist=FieldToDrop
    

Upvotes: 1

Robin Moffatt
Robin Moffatt

Reputation: 32140

  1. Please don't use JsonConverter & schemas.enable=false :-) Your data in Oracle has such a wonderful schema, it is a shame to throw it away! In all seriousness, using something like Avro, Protobuf, or JSON Schema keeps your message sizes small in the Kafka topic whilst retaining the schema.

    See articles like this one for more details on this important concept.

  2. Single Message Transform (SMT) are probably what you're looking for to transform the data en-route to Kafka. For example, you can insert fields, flatten payloads, and lots more. If there isn't an existing SMT to do what you want, you can write your own using the Java API.

    You can also use Kafka Streams or ksqlDB to do stream processing on the data once it's in Kafka if you want to do more complex work like joining, aggregating, etc.

Upvotes: 4

Related Questions