Jicaar
Jicaar

Reputation: 1104

Way to read data from Kafka headers in Apache Flink

I have a project where I am consuming data from Kafka. Apparently, there are a couple fields that are going to be included in the headers that I will need to read as well for each message. Is there a way to do this in Flink currently?

Thanks!

Upvotes: 2

Views: 4069

Answers (3)

Chandan Bansal
Chandan Bansal

Reputation: 49

Here's the code for new versions of Flink.

    KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(ParameterConfig.parameters.getRequired(ParameterConstant.KAFKA_ADDRESS))
            .setTopics(ParameterConfig.parameters.getRequired(ParameterConstant.KAFKA_SOURCE_TOPICS))
            .setGroupId(ParameterConfig.parameters.getRequired(ParameterConstant.KAFKA_SOURCE_GROUPID))
            .setStartingOffsets(OffsetsInitializer.latest())
            .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
                @Override
                public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) {
                    try {
                        Map<String, String> headers = StreamSupport
                                .stream(consumerRecord.headers().spliterator(), false)
                                .collect(Collectors.toMap(Header::key, h -> new String(h.value())));
                        collector.collect(new JSONObject(headers).toString());
                    } catch (Exception e){
                        e.printStackTrace();
                        log.error("Headers Not found in Kafka Stream with consumer record : {}", consumerRecord);
                    }
                }

                @Override
                public TypeInformation<String> getProducedType() {
                    return TypeInformation.of(new TypeHint<>() {});
                }
            })
            .build();

Upvotes: 2

pshomov
pshomov

Reputation: 33

I faced similar issue and found a way to do this in Flink 1.8. Here is what I wrote:

FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer("topic", new JSONKeyValueDeserializationSchema(true){
    ObjectMapper mapper = new ObjectMapper();
    @Override
    public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        ObjectNode result = super.deserialize(record);
        if (record.headers() != null) {
            Map<String, JsonNode> headers = StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(h -> h.key(), h -> (JsonNode)this.mapper.convertValue(new String(h.value()), JsonNode.class)));
            result.set("headers", mapper.convertValue(headers, JsonNode.class));
        }

        return result;
    }
}, kafkaProps);

Hope this helps!

Upvotes: 2

Abareghi
Abareghi

Reputation: 94

@Jicaar, Actually Kafka has added Header notion since version 0.11.0.0. https://issues.apache.org/jira/browse/KAFKA-4208

The problem is flink-connector-kafka-0.11_2.11 which comes with flink-1.4.0, and supposedly supports kafka-0.11.0.0 just ignores message headers when reading from kafka.

So unfortunately there is no way to read those headers unless you implement your own KafkaConsumer in flin.

I'm also interested in readin in kafka message headers and hope Flink team will add support for this.

Upvotes: 2

Related Questions