Reputation: 91
I want to use TimeBasePartitioner and use RecordField for partitioning. here is my configs:
"partition.duration.ms": "86400000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor": "RecordField",
"timestamp.field": "timestamp",
"timezone": "UTC",
"locale": "US",
my input is something like this:
{
"id": 5653383,
"service_type": 1,
"speed": 7,
"status": 4,
"timestamp": 1725089735
}
but all records sink to 1970-01-20
Also I tried to use "transforms": "TimestampConverter" with these configs:
"transforms": "TimestampConverter",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.unix.precision": "milliseconds",
but it parsed all values to 1970-01-20
full connector config:
{
"aws.access.key.id": "******",
"aws.secret.access.key": "****",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": "50000",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "test",
"parquet.codec": "snappy",
"partition.duration.ms": "86400000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"s3.bucket.name": "test",
"schema.compatibility": "NONE",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"store.url": "****",
"tasks.max": "1",
"timestamp.extractor": "RecordField",
"timestamp.field": "timestamp",
"timezone": "UTC",
"locale": "US",
"topics": "test",
"topics.dir": "",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.unix.precision": "milliseconds",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.schema.registry.url": ""
}
Upvotes: 0
Views: 28