Reputation: 21
Hi I am new to Flink and trying to figure out some best practices with the following scenerio:
I am playing around with a Flink job that reads unique data from multiple CSV files. Each row in the CSV is composed of three columns: userId, appId, name. I have to do some processing on each of these records (capitalize the name) and post the record to a Kafka Topic.
The goal is to filter out any duplicate records that exist so we do not have duplicate messages in the output Kafka Topic.
I am doing a keyBy(userId, appId) on the stream and keeping a boolean value state "Processed" to filter out duplicate records.
The issue is when I cancel the Task Manager in the middle of processing a file, to simulate a failure, it will start processing the file from the beginning once it restarts.
This is a problem because the "Processed" State in the Flink job is also wiped clean after the Task Manager fails!
This leads to duplicate messages in the output Kafka topic.
How can I prevent this?
I need to restore the "Processed" Flink state to what it was prior to the Task Manager failing. What is the best practice to do this?
Would Flink's checkpointed function https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction help? I think not because this is a keyed stream.
Things to consider:
Thank you for the help!
Upvotes: 2
Views: 805
Reputation: 43439
You need to use flink's managed keyed state, which Flink will maintain in a manner that is consistent with the sources and sinks, and Flink will guarantee exactly-once behavior provided you setup checkpointing and configure the sinks properly.
There's a tutorial on this topic in the Flink documentation, and it happens to use deduplication as the example use case.
For more info on how to configure Kafka for exactly-once operation, see Exactly-once with Apache Kafka.
Disclaimer: I wrote that part of the Flink documentation, and I work for Immerok.
Upvotes: 0
Reputation: 2068
I would recommend to read up on Flink's fault tolerance mechanisms, checkpointing & savepointing. https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/ and https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/ are good places to start.
I think you could also achieve your deduplication easier by using Table API/SQL. See https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/
Upvotes: 0