Reputation: 11
I am trying to add the metadata to the output from kafka into the S3 bucket.
Currently, the output is just the values from the messages from the kafka topic.
I want to get it wrapped with the following (metadata): topic, timestamp, partition, offset, key, value
example:
{
"topic":"some-topic",
"timestamp":"some-timestamp",
"partition":"some-partition",
"offset":"some-offset",
"key":"some-key",
"value":"the-orig-value"
}
note: when I am fetching it throw a consumer it fetched all the metadata. as I wished.
my connector configuration:
{
"name" : "test_s3_sink",
"config" : {
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"errors.log.enable" : "true",
"errors.log.include.messages" : "true",
"flush.size" : "10000",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"name" : "test_s3_sink",
"rotate.interval.ms" : "60000",
"s3.bucket.name" : "some-bucket-name",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"topics" : "some.topic",
"topics.dir" : "some-dir"
}
}
Thanks.
Upvotes: 1
Views: 1332
Reputation: 191743
Currently, the output is just the values from the messages from the kafka topic.
Correct, this is the documented behavior. There's a setting for including the key data that you're missing, if you wanted that, as well, but no settings to get the rest of the data.
For the record timestamp, you could edit your producer code to simply add that as part of your records. (and everything else, for that matter, if you're able to query for the next offset of the topic every time you produce)
For Topic and Partition, those are part of the S3 file, so whatever you're reading the files with should be able to parse out that information; the offset value is also part of the filename, then add the line number within the file to get the (approximate) offset of the record.
Or, you can use a Connect transform such as this archive one that relocates the Kafka record metadata (except offset and partition) all into the Connect Struct value such that the sink connector will then write all of it to the files
https://github.com/jcustenborder/kafka-connect-transform-archive
Either way, ConnectRecord has no offset field, a SinkRecord
does, but I think that's too late in the API for transforms to access it. In other words, a feature request, or different connector would be necessary.
Upvotes: 1
Reputation: 1
The latest versions of the Lenses S3 Connector allows you to output the metadata including headers.
Data is S3 will appear like this:
{
"key": <the message Key, which can be a primitive or a complex object>,
"value": <the message Key, which can be a primitive or a complex object>,
"headers": {
"header1": "value1",
"header2": "value2"
},
"metadata": {
"offset": 821122,
"partition": 3,
"timestamp": 1695645345,
"topic": "some.topic"
}
}
Connect config:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO myTargetS3Bucket:somePrefix SELECT * FROM ’some.topic’ STOREAS AVRO PROPERTIES (‘store.envelope’=true)
topics=some.topic
name= test_s3_sink
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
It's explained here: https://lenses.io/blog/2023/09/open-source-lenses-kafka-s3-connector/
Upvotes: 0