정현우
정현우

Reputation: 31

Flink Job Processes Kafka Messages Twice After JobManager Failover in HA Mode

I am using Beam's KafkaIO.Read function to create a streaming processing pipeline, running it as a Flink job. In simple terms, the job receives messages from Kafka and inserts them into a database. Usually, it operates without any issues, but under specific conditions, I've observed abnormal behavior.

We are using Flink in High Availability (HA) mode with multiple JobManagers. We wanted to confirm whether the job continues processing without issues when the leader JobManager crashes. Since we are operating on Kubernetes, we used the delete pod command to kill the leader JobManager.

After that, we confirmed that the standby JobManager properly recovered and re-executed the job. Up to this point, everything was fine. However, after recovery, for some reason, Kafka messages started being processed twice.

This is not about partial duplicate processing due to discrepancies between checkpoints and Kafka offset progress. All new Kafka messages added after recovery were processed twice.

Upon further investigation, we found that although only one job was running on the JobManager, it appeared that the TaskManager was executing the job twice. (Only one job was displayed in the Flink UI, and it was running in application mode.)

We also confirmed that restarting the TaskManager during the occurrence of the issue restored normal operation.

My assumption is that before the JobManager shuts down, it needs to stop the job processes running on the TaskManager, but since it couldn't do that before crashing, the issue occurred. The reason is that when we tested by stopping PostgreSQL, the sink target, the job on the JobManager failed, and even if the JobManager crashed, the same issue did not occur. Therefore, it seems that when the shutdown is not unexpected, it correctly stops the job processes running on the TaskManager.

Based on the above assumption, I thought that if I gracefully shut down the JobManager, the issue might be resolved. However, since I understand that Flink is basically gracefully shut down, I am not sure what to fix to resolve it.

In conclusion, I would like to confirm the following:

  1. What is the reason for the occurrence of this issue?
  2. What can be done to resolve it? If anyone is familiar with this issue, I would appreciate it if you could share your insights.

Below are the code snippets for the KafkaIO part and the Flink/Beam configurations.

Beam KafkaIO part

pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read()
                        .withBootstrapServers(options.getBootstrapServerList())
                        .withTopic("test-topic")
                        .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test_group"))
                        .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"))
                        .commitOffsetsInFinalize()
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        .withoutMetadata())

Beam build.gradle

dependencies {
    // App dependencies.
    implementation "org.apache.beam:beam-sdks-java-core:2.56.0"
    implementation "org.apache.beam:beam-runners-direct-java:2.56.0"
    implementation "org.slf4j:slf4j-jdk14:1.7.32"
    implementation "org.apache.kafka:kafka-clients:2.8.1"
    implementation "org.apache.beam:beam-sdks-java-io-kafka:2.56.0"
    implementation 'org.apache.beam:beam-sdks-java-io-jdbc:2.56.0'
    implementation 'org.apache.beam:beam-runners-flink-1.17:2.56.0'
    runtimeOnly "org.postgresql:postgresql:42.2.27"
    runtimeOnly "org.hamcrest:hamcrest:2.2"

    implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.12.3'
    implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.3'

    // Tests dependencies.
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

flink cofiguration manifest file

flink-conf.yaml: |+
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    kubernetes.namespace: test-namespace
    kubernetes.service-account: flink-service-account
    high-availability.type: kubernetes
    high-availability.storageDir: hdfs://xx.xx.xx.xx:9000/flink/blue/ha
    kubernetes.cluster-id: cluster20
    jobmanager.execution.failover-strategy: full
    restart-strategy.type: fixed-delay
    restart-strategy.fixed-delay.delay: 5 s
    restart-strategy.fixed-delay.attempts: 10
    execution.checkpointing.interval: 10s
    state.checkpoints.dir: hdfs://xx.xx.xx.xx:9000/flink/app_mode/checkpoints
    state.checkpoint-storage: filesystem
    state.checkpoints.num-retained: 3
    parallelism.default: 6
    taskmanager.numberOfTaskSlots: 6
    jobmanager.memory.process.size: 2048m
    taskmanager.memory.process.size: 2048m

(ip address, namespace name, etc.. are masked) (flink version is 1.17.2)

Upvotes: 0

Views: 104

Answers (2)

Jan Lukavsky
Jan Lukavsky

Reputation: 131

The was a bug which was recently fixed that might have caused this. A fix should be released as part of Beam 2.63.0.

Upvotes: 0

Ignotus
Ignotus

Reputation: 1

I am not familiar with Apache-Beam but I believe the issue might be related to Kafka not committing the offsets in time. After writing data to the database, the offsets may not be committed immediately. Kafka waits until all messages in the batch (by default 500 messages due to the max.poll.records setting) are processed. This could lead to reprocessing if the system fails or restarts before the offsets are committed.

To reduce the number of duplicates processed in case of failure, you can:

  1. Reduce max.poll.records: This will limit the number of messages in each batch, meaning that fewer messages will be at risk of duplication if something goes wrong mid-batch.
  2. Manually commit offsets: After each database write, commit the Kafka offsets manually to ensure messages are not reprocessed after they are rebalanced.

For more information, you can refer to this article which explains it nicely Offset Management

Upvotes: 0

Related Questions