Francesco Pelacani
Francesco Pelacani

Reputation: 11

Is there a way to reduce lag in a Kafka Consumer group?

my team and I are facing some problems in consuming lag in specific Kafka topics. We have a s3-sink connector configured like this:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: dataplatform-s3-sink-connector-1
  labels:
    strimzi.io/cluster: dataplatform
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 108
  config:
    topics: topic01, topic02, topic03, topic04, topic05, topic06, topic07, topic08, topic09
    s3.region: region
    s3.bucket.name: bucket-name
    s3.part.size: 5242880 # 5Mb
    flush.size: 50000
    rotate.schedule.interval.ms: 43200000
    storage.class: io.confluent.connect.s3.storage.S3Storage
    store.url: our.URL
    value.converter:  xx.yyyy.zzzzzz.connect.json.CustomJsonSchemaConverter
    value.converter.schemas.enable: true
    value.converter.schema.registry.url: schemaregistry.URL
    key.converter:  org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    format.class: io.confluent.connect.s3.format.parquet.ParquetFormat
    transforms: TimestampConverter
    transforms.TimestampConverter.field: ts
    transforms.TimestampConverter.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.TimestampConverter.format: yyyy-MM-dd HH:mm:ss.SSSSSS
    transforms.TimestampConverter.target.type: Timestamp
    partitioner.class: xx.yyyy.zzzzzz.connect.storage.partitioner.CustomTimeBasedPartitioner
    timestamp.extractor: RecordField
    timestamp.field: ts
    partition.duration.ms: 86400000
    locale: it-IT
    timezone: UTC
    path.format: "'YEAR'=YYYY/'MONTH'=MM/'DAY'=dd"
    topics.dir: ""
    consumer.override.partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor

We have a total of 2Bn cumulative lag over this 9 topics, ~99% of this lag regards 3 massive topics (lets say 07,08 and 09) that have a big flow of incoming data (~30k-50k records per minute). We found out that we are only able to write a parquet file containing 50k rows every ~20min in the object storage per partition (total # partition per topic: 12), with size ~3MB.

We have a total of 20 connector running (most of them run in par with lag), circa 550 tasks in total and 12 workers replicas that can host them (circa 44-45 instances per worker). Each worker has 8Gb of JVM dedicated memory and 2 CPUs. We are using Kafka v-2.7.0 distributed over Kubernetes cluster with stirmzi.io.

Is there a way to tune this connector in order to digest lag more quickly? Do we have to add more resources to our workers? Do we have to scale out our workers' #?

Any best practise is welcome. Available to give further details if needed.

Thank you for your help.

In order to improve its performance we gave only to this specific connector the StickyAssignor partition assignment strategy, while all the others run in RangeAssignor (last parameter of the configuration). We notice a little improvement but not sufficient.

Upvotes: 1

Views: 140

Answers (0)

Related Questions