Reputation: 31
I am using Beam's KafkaIO.Read
function to create a streaming processing pipeline, running it as a Flink job. In simple terms, the job receives messages from Kafka and inserts them into a database.
Usually, it operates without any issues, but under specific conditions, I've observed abnormal behavior.
We are using Flink in High Availability (HA) mode with multiple JobManagers. We wanted to confirm whether the job continues processing without issues when the leader JobManager crashes. Since we are operating on Kubernetes, we used the delete pod command to kill the leader JobManager.
After that, we confirmed that the standby JobManager properly recovered and re-executed the job. Up to this point, everything was fine. However, after recovery, for some reason, Kafka messages started being processed twice.
This is not about partial duplicate processing due to discrepancies between checkpoints and Kafka offset progress. All new Kafka messages added after recovery were processed twice.
Upon further investigation, we found that although only one job was running on the JobManager, it appeared that the TaskManager was executing the job twice. (Only one job was displayed in the Flink UI, and it was running in application mode.)
We also confirmed that restarting the TaskManager during the occurrence of the issue restored normal operation.
My assumption is that before the JobManager shuts down, it needs to stop the job processes running on the TaskManager, but since it couldn't do that before crashing, the issue occurred. The reason is that when we tested by stopping PostgreSQL, the sink target, the job on the JobManager failed, and even if the JobManager crashed, the same issue did not occur. Therefore, it seems that when the shutdown is not unexpected, it correctly stops the job processes running on the TaskManager.
Based on the above assumption, I thought that if I gracefully shut down the JobManager, the issue might be resolved. However, since I understand that Flink is basically gracefully shut down, I am not sure what to fix to resolve it.
In conclusion, I would like to confirm the following:
Below are the code snippets for the KafkaIO part and the Flink/Beam configurations.
Beam KafkaIO part
pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServerList())
.withTopic("test-topic")
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test_group"))
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"))
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
Beam build.gradle
dependencies {
// App dependencies.
implementation "org.apache.beam:beam-sdks-java-core:2.56.0"
implementation "org.apache.beam:beam-runners-direct-java:2.56.0"
implementation "org.slf4j:slf4j-jdk14:1.7.32"
implementation "org.apache.kafka:kafka-clients:2.8.1"
implementation "org.apache.beam:beam-sdks-java-io-kafka:2.56.0"
implementation 'org.apache.beam:beam-sdks-java-io-jdbc:2.56.0'
implementation 'org.apache.beam:beam-runners-flink-1.17:2.56.0'
runtimeOnly "org.postgresql:postgresql:42.2.27"
runtimeOnly "org.hamcrest:hamcrest:2.2"
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.12.3'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.3'
// Tests dependencies.
testImplementation "junit:junit:4.13.2"
testImplementation 'org.hamcrest:hamcrest:2.2'
}
flink cofiguration manifest file
flink-conf.yaml: |+
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
kubernetes.namespace: test-namespace
kubernetes.service-account: flink-service-account
high-availability.type: kubernetes
high-availability.storageDir: hdfs://xx.xx.xx.xx:9000/flink/blue/ha
kubernetes.cluster-id: cluster20
jobmanager.execution.failover-strategy: full
restart-strategy.type: fixed-delay
restart-strategy.fixed-delay.delay: 5 s
restart-strategy.fixed-delay.attempts: 10
execution.checkpointing.interval: 10s
state.checkpoints.dir: hdfs://xx.xx.xx.xx:9000/flink/app_mode/checkpoints
state.checkpoint-storage: filesystem
state.checkpoints.num-retained: 3
parallelism.default: 6
taskmanager.numberOfTaskSlots: 6
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
(ip address, namespace name, etc.. are masked) (flink version is 1.17.2)
Upvotes: 0
Views: 104
Reputation: 131
The was a bug which was recently fixed that might have caused this. A fix should be released as part of Beam 2.63.0.
Upvotes: 0
Reputation: 1
I am not familiar with Apache-Beam but I believe the issue might be related to Kafka not committing the offsets in time. After writing data to the database, the offsets may not be committed immediately. Kafka waits until all messages in the batch (by default 500 messages due to the max.poll.records setting) are processed. This could lead to reprocessing if the system fails or restarts before the offsets are committed.
To reduce the number of duplicates processed in case of failure, you can:
For more information, you can refer to this article which explains it nicely Offset Management
Upvotes: 0