Hespen
Hespen

Reputation: 1452

Kafka Connect S3 sink - how to use the timestamp from the message itself [timestamp extractor]

I've been struggling with a problem using kafka connect and the S3 sink.

First the structure:

{
   Partition: number
   Offset: number
   Key: string
   Message: json string
   Timestamp: timestamp
}

Normally when posting to Kafka, the timestamp should be set by the producer. Unfortunately there seems to be cases where this didn't happen. This means that the Timestamp might sometimes be null

To extract this timestamp the connector was set to the following value: "timestamp.extractor":"Record".

Now it is always certain that the Message field itself always contains a timestamp as well.

Message:

{
   timestamp: "2019-04-02T06:27:02.667Z"
   metadata: {
     creationTimestamp: "1554186422667"
   }
}

The question however is that now, I would like to use that field for the timestamp.extractor

I was thinking that this would suffice, but this doesn't seem to work:

"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",

This results in a NullPointer as well.

Any ideas as to how to use the timestamp from the kafka message payload itself, instead of the default timestamp field that is set for kafka v0.10+

EDIT: Full config:

{ "name": "<name>",
  "config": {
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"4",
    "topics":"<topic>",
    "flush.size":"100",
    "s3.bucket.name":"<bucket name>",
    "s3.region": "<region>",
    "s3.part.size":"<partition size>",
    "rotate.schedule.interval.ms":"86400000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
    "locale":"ENGLISH",
    "timezone":"UTC",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "timestamp.extractor":"RecordField",
    "timestamp.field":"message.timestamp",
    "max.poll.interval.ms": "600000",
    "request.timeout.ms": "610000",
    "heartbeat.interval.ms": "6000",
    "session.timeout.ms": "20000",
    "s3.acl.canned":"bucket-owner-full-control"
  }
}

EDIT 2: Kafka message payload structure:

{
  "reference": "",
  "clientId": "",
  "gid": "",
  "timestamp": "2019-03-19T15:27:55.526Z",
}

EDIT 3:

{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}

So I tried doing a transform on the object, but it seems like I've been stuck again on this thing. The pattern seems to be invalid. Looking around the internet it does seem like this is a valid SimpleDatePattern. It seems to be complaining about the 'T'. Updated the message schema as well.

Upvotes: 1

Views: 5546

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191691

If the data is a string, then Connect will try to parse as milliseconds - source code here.

In any case, message.timestamp assumes the data looks like { "message" : { "timestamp": ... } }, so just timestamp would be correct. And having nested fields didn't use to be possible anyway, so you might want to clarify which version of Connect you have.

I'm not entirely sure how you would get instanceof Date to evalutate to true when using JSON Converter, and even if you had set schema.enable = true, then also in the code, you can see there is only conditions for schema types of numbers and strings, but still assumes that it is milliseconds.

You can try using the TimestampConverter transformation to convert your date string.

Upvotes: 1

Robin Moffatt
Robin Moffatt

Reputation: 32080

Based on the schema you've shared, you should be setting:

    "timestamp.extractor":"RecordField",
    "timestamp.field":"timestamp",

i.e. no message prefix to the timestamp field name.

Upvotes: 1

Related Questions