Reputation: 1122
I've implemented a Kafka Connect JDBC Source connector that is connected to an Oracle-Database and is writing data to a Kafka-Topic. Currently, I've set the option value.converter=org.apache.kafka.connect.json.JsonConverter
with value.converter.schemas.enable=false
being set. This option makes it possible to write JSON data to the Kafka topic (which works fine, by the way), but doesn't include the option to modify the data before sending it to the Kafka Broker.
My question now is: Is there a way to modify the data that is being sent to the Kafka Topic? In my case, the Source Connector runs a custom query and writes this directly to the Kafka topic. Anyhow, I want to extend this JSON with some custom columns and nesting. Is there a way to do so?
Upvotes: 1
Views: 3443
Reputation: 1054
An example configuration to do some SMTs like rename fields, remove fields, or add fields.
Process:
DB table -connector infers schema of fields-> input Connect fields (internal connect data-structure/ connectRecord(s)) -> SMT1 -> SMT2 -> ... -> last SMT -> JsonConverter -> output json message.
DB Table:
current_name1 | current_name2 | FieldToDrop
bla1 bla2 bla3
input Connect fields inferred:
"current_name1" = "bla1" // this is a connect record
"current_name2" = "bla2" // this is a connect record
"FieldToDrop" = "bla3" // this is a connect record
output json for value:
{
"new_name1": "bla1",
"new_name2": "bla2",
"type": "MyCustomType"
}
Connector configuration:
name=example-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
...
transforms=RenameFields,InsertFieldType,DropFields
transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameFields.renames=current_name1:new_name1,current_name2:new_name2
transforms.InsertFieldType.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertFieldType.static.field=type
transforms.InsertFieldType.static.value=MyCustomType
transforms.DropFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.DropFields.blacklist=FieldToDrop
Upvotes: 1
Reputation: 32140
Please don't use JsonConverter
& schemas.enable=false
:-) Your data in Oracle has such a wonderful schema, it is a shame to throw it away! In all seriousness, using something like Avro, Protobuf, or JSON Schema keeps your message sizes small in the Kafka topic whilst retaining the schema.
See articles like this one for more details on this important concept.
Single Message Transform (SMT) are probably what you're looking for to transform the data en-route to Kafka. For example, you can insert fields, flatten payloads, and lots more. If there isn't an existing SMT to do what you want, you can write your own using the Java API.
You can also use Kafka Streams or ksqlDB to do stream processing on the data once it's in Kafka if you want to do more complex work like joining, aggregating, etc.
Upvotes: 4