Reputation: 11
my team and I are facing some problems in consuming lag in specific Kafka topics. We have a s3-sink connector configured like this:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: dataplatform-s3-sink-connector-1
labels:
strimzi.io/cluster: dataplatform
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 108
config:
topics: topic01, topic02, topic03, topic04, topic05, topic06, topic07, topic08, topic09
s3.region: region
s3.bucket.name: bucket-name
s3.part.size: 5242880 # 5Mb
flush.size: 50000
rotate.schedule.interval.ms: 43200000
storage.class: io.confluent.connect.s3.storage.S3Storage
store.url: our.URL
value.converter: xx.yyyy.zzzzzz.connect.json.CustomJsonSchemaConverter
value.converter.schemas.enable: true
value.converter.schema.registry.url: schemaregistry.URL
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
format.class: io.confluent.connect.s3.format.parquet.ParquetFormat
transforms: TimestampConverter
transforms.TimestampConverter.field: ts
transforms.TimestampConverter.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format: yyyy-MM-dd HH:mm:ss.SSSSSS
transforms.TimestampConverter.target.type: Timestamp
partitioner.class: xx.yyyy.zzzzzz.connect.storage.partitioner.CustomTimeBasedPartitioner
timestamp.extractor: RecordField
timestamp.field: ts
partition.duration.ms: 86400000
locale: it-IT
timezone: UTC
path.format: "'YEAR'=YYYY/'MONTH'=MM/'DAY'=dd"
topics.dir: ""
consumer.override.partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
We have a total of 2Bn cumulative lag over this 9 topics, ~99% of this lag regards 3 massive topics (lets say 07,08 and 09) that have a big flow of incoming data (~30k-50k records per minute). We found out that we are only able to write a parquet file containing 50k rows every ~20min in the object storage per partition (total # partition per topic: 12), with size ~3MB.
We have a total of 20 connector running (most of them run in par with lag), circa 550 tasks in total and 12 workers replicas that can host them (circa 44-45 instances per worker). Each worker has 8Gb of JVM dedicated memory and 2 CPUs. We are using Kafka v-2.7.0 distributed over Kubernetes cluster with stirmzi.io.
Is there a way to tune this connector in order to digest lag more quickly? Do we have to add more resources to our workers? Do we have to scale out our workers' #?
Any best practise is welcome. Available to give further details if needed.
Thank you for your help.
In order to improve its performance we gave only to this specific connector the StickyAssignor partition assignment strategy, while all the others run in RangeAssignor (last parameter of the configuration). We notice a little improvement but not sufficient.
Upvotes: 1
Views: 140