Nestor
Nestor

Reputation: 91

problem with TimeBasePartitioner in s3 sink connector

I want to use TimeBasePartitioner and use RecordField for partitioning. here is my configs:

"partition.duration.ms": "86400000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor": "RecordField",
"timestamp.field": "timestamp",
"timezone": "UTC",
"locale": "US",

my input is something like this:

{
"id": 5653383,
"service_type": 1,
"speed": 7,
"status": 4,
"timestamp": 1725089735
}

but all records sink to 1970-01-20

Also I tried to use "transforms": "TimestampConverter" with these configs:

    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.field": "timestamp",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.unix.precision": "milliseconds",

but it parsed all values to 1970-01-20

full connector config:

{
    "aws.access.key.id": "******",
    "aws.secret.access.key": "****",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "flush.size": "50000",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "test",
    "parquet.codec": "snappy",
    "partition.duration.ms": "86400000",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "s3.bucket.name": "test",
    "schema.compatibility": "NONE",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "store.url": "****",
    "tasks.max": "1",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "timestamp",
    "timezone": "UTC",
    "locale": "US",
    "topics": "test",
    "topics.dir": "",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.field": "timestamp",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.unix.precision": "milliseconds",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.avroRecordType": "GENERIC_RECORD",
    "value.converter.schema.registry.url": ""
}

Upvotes: 0

Views: 28

Answers (0)

Related Questions