Reputation: 61
How do I retrieve incoming headers from the kafka message with Kafka Connect to store them as additional data fields with MongoDB Sink Connector to mongodb.
I have a kafka topic "PROJECT_EXAMPLE_TOPIC". As you see I am already able to save msg timestamp, incoming message data and mongo document created/updated dates.
I guess there is a function to extract header somewhere.
Example kafka value
// incoming kafka value
{
"msgId" : "exampleId"
}
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_header_foo" : "header_foo_value"
}
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_headers" : {
"header_001" : "header_001_value",
"header_002" : "header_002_value",
...
"header_x" : "header_x_value"
}
}
There is my configuration
{
"name": "sink-mongo-PROJECT-EXAMPLE",
"config": {
"topics": "PROJECT_EXAMPLE_TOPIC",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
"key.converter.schemas.enable": "false",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
"value.converter.schemas.enable": "false",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
"connection.uri": "PROJECT_REFERENTIAL_MONGO_URL",
"database": "PROJECT_DB_NAME",
"collection": "EXAMPLE",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"key.projection.type": "none",
"key.projection.list": "",
"field.renamer.mapping": "[]",
"field.renamer.regex": "[]",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"value.projection.list": "msgId",
"value.projection.type": "whitelist",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
"delete.on.null.values": "false",
"max.batch.size": "0",
"rate.limiting.timeout": "0",
"rate.limiting.every.n": "0",
"change.data.capture.handler": "",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"transforms": "InsertSource,InsertTopic,InsertTimestamp",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "message_source",
"transforms.InsertSource.static.value": "mongo_connector",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "message_timestamp"
}
}
Upvotes: 6
Views: 4495
Reputation: 1438
This is a bit of an old question, but there is a 3rd party message transform that can convert headers to fields on either key or value
This won't allow you to grab all headers though, you need to specify by name the ones you want to extract and their type.
Upvotes: 1