Reputation: 792
After digging through many SO posts and even JIRA issues, I don't know where to look anymore. Every single checkpoint in Flink fails due to timeout, in the exception section for the job it shows following error, however the job itself does not fail:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 600000milliseconds while awaiting InitProducerId
When disabling checkpointing, everything regarding Kafka works as expected, so my assumption is it may have something to do with the checkpoint waiting for a Kafka commit in order to be acknowledged (Semantic
is set to EXACTLY_ONCE
). I remember reading about timeout mismatch leading to problems, so I have aligned the TRANSACTION_TIMEOUT_CONFIG
in my FlinkKafkaProducer to 900000
milliseconds.
I also tweaked the TransactionTimeout and the MaxBlockMS, as suggested in this issue, which has a lot of discussion going on right now about this exact same error but apparently no solution.
The Flink book "Stream Processing with Apache Flink" suggests to carefully revise Kafka configurations, e.g. acks
, log.flush.interval.messages
, log.flush.interval.ms
and log.flush.*
. We already had this working under Flink 1.9.1 though, but since we upgraded to 1.11.1 it does not work anymore. I don't know if somebody touched Kafka settings in the meanwhile, but as far as I can see most of those setting are left to default, except for log.flush.interval=10000
. We are using Confluent 5.3.3
as before, which means Kafka 2.3.1
.
Also, the Flink job is deployed in a single-node environment, so it should have access to the file system, the whole directory is owned by the user running the Flink service (was a suggested solution in another SO thread).
Does anybody have an idea what causes the failure of those checkpoints?
Upvotes: 2
Views: 5677
Reputation: 792
Well after having a massive headache about this, I finally found the issue: The Kafka settings were in fact altered, as the transaction.state.log.replication.factor
was lower than the transaction.state.log.min.isr
, leading to no transaction actually succeeding, as there are never enough in-sync replicas of the Kafka topic.
Upvotes: 7