ChrisRTech
ChrisRTech

Reputation: 577

Flink getting past bad messages in Kafka: "poison message"

First time I'm trying to get this to work so bear with me. I'm trying to learn checkpointing with Kafka and handling "bad" messages, restarting without losing state.

Use Case: Use checkpointing. Read a stream of integers from Kafka, keep a running sum. If a "bad" Kafka message read, restart app, skip the "bad" message, keep state. My stream would look something look like this:

set1,5
set1,7
set1,foobar
set1,6

I want my app to keep a running sum of the integers it has seen, and restart if it crashes without losing state, so app behavior/running sum would be:
5,
12,
app crashes and restarts, reads checkpoint
18
etc.

However, I'm finding when my app restarts, it keeps reading the bad "foobar" message and doesnt get past it. Source code below. The mapper bombs when I try to parse "foobar" as an Integer. How can I modify app to get past "poison" message?

    env.enableCheckpointing(1000L);   


   env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); 
        env.getCheckpointConfig().setCheckpointTimeout(10000); 
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
        env.setStateBackend(new 
        FsStateBackend("hdfs://mymachine:9000/flink/checkpoints")); 

        Properties properties = new Properties(); 
        properties.setProperty("bootstrap.servers", BROKERS); 
        properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST); 
        properties.setProperty("group.id", "consumerGroup1"); 

        FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName, 
        new SimpleStringSchema(), properties); 
        DataStream<String> messageStream = env.addSource(kafkaConsumer); 

        DataStream<Tuple2&lt;String,Integer>> sums = messageStream 
          .map(new NumberMapper()) 
          .keyBy(0) 
          .sum(1);  
          sums.print(); 


                private static class NumberMapper implements 
        MapFunction<String,Tuple2<String,Integer>> { 
                        public Tuple2<String,Integer> map(String input) throws Exception { 
                                return parseData(input); 
                        } 

                        private Tuple2<String,Integer> parseData(String record) { 

                                String[] tokens = record.toLowerCase().split(","); 

                                // Get Key 
                                String key = tokens[0]; 

                                // Get Integer Value 
                                String integerValue = tokens[1]; 
                                System.out.println("Trying to Parse=" + integerValue); 
                                Integer value = Integer.parseInt(integerValue); 

                                // Build Tuple
                                return new Tuple2<String,Integer>(key, value); 
                        } 

                } 

Upvotes: 1

Views: 532

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

You could change the NumberMapper into a FlatMap and filter out invalid elements:

private static class NumberMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { 
        public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception { 
                Optional<Tuple2<String, Integer>> optionalResult = parseData(input); 

                optionalResult.ifPresent(collector::collect);
        } 

        private Optional<Tuple2<String, Integer>> parseData(String record) { 

                String[] tokens = record.toLowerCase().split(","); 

                // Get Key 
                String key = tokens[0]; 

                // Get Integer Value 
                String integerValue = tokens[1]; 

                try {
                        Integer value = Integer.parseInt(integerValue); 
                        // Build Tuple
                        return Optional.of(Tuple2.of(key, value)); 
                } catch (NumberFormatException e) {
                        return Optional.empty();
                }
        } 
} 

Upvotes: 2

Related Questions