Reputation: 1452
I've been struggling with a problem using kafka connect and the S3 sink.
First the structure:
{
Partition: number
Offset: number
Key: string
Message: json string
Timestamp: timestamp
}
Normally when posting to Kafka, the timestamp should be set by the producer. Unfortunately there seems to be cases where this didn't happen. This means that the Timestamp might sometimes be null
To extract this timestamp the connector was set to the following value:
"timestamp.extractor":"Record"
.
Now it is always certain that the Message
field itself always contains a timestamp as well.
Message
:
{
timestamp: "2019-04-02T06:27:02.667Z"
metadata: {
creationTimestamp: "1554186422667"
}
}
The question however is that now, I would like to use that field for the timestamp.extractor
I was thinking that this would suffice, but this doesn't seem to work:
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
This results in a NullPointer as well.
Any ideas as to how to use the timestamp from the kafka message payload itself, instead of the default timestamp field that is set for kafka v0.10+
EDIT: Full config:
{ "name": "<name>",
"config": {
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"4",
"topics":"<topic>",
"flush.size":"100",
"s3.bucket.name":"<bucket name>",
"s3.region": "<region>",
"s3.part.size":"<partition size>",
"rotate.schedule.interval.ms":"86400000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"locale":"ENGLISH",
"timezone":"UTC",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",
"max.poll.interval.ms": "600000",
"request.timeout.ms": "610000",
"heartbeat.interval.ms": "6000",
"session.timeout.ms": "20000",
"s3.acl.canned":"bucket-owner-full-control"
}
}
EDIT 2: Kafka message payload structure:
{
"reference": "",
"clientId": "",
"gid": "",
"timestamp": "2019-03-19T15:27:55.526Z",
}
EDIT 3:
{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}
So I tried doing a transform on the object, but it seems like I've been stuck again on this thing. The pattern seems to be invalid. Looking around the internet it does seem like this is a valid SimpleDatePattern. It seems to be complaining about the 'T'
. Updated the message schema as well.
Upvotes: 1
Views: 5546
Reputation: 191691
If the data is a string, then Connect will try to parse as milliseconds - source code here.
In any case, message.timestamp
assumes the data looks like { "message" : { "timestamp": ... } }
, so just timestamp
would be correct. And having nested fields didn't use to be possible anyway, so you might want to clarify which version of Connect you have.
I'm not entirely sure how you would get instanceof Date
to evalutate to true when using JSON Converter, and even if you had set schema.enable = true
, then also in the code, you can see there is only conditions for schema types of numbers and strings, but still assumes that it is milliseconds.
You can try using the TimestampConverter transformation to convert your date string.
Upvotes: 1
Reputation: 32080
Based on the schema you've shared, you should be setting:
"timestamp.extractor":"RecordField",
"timestamp.field":"timestamp",
i.e. no message
prefix to the timestamp field name.
Upvotes: 1