How to use FlinkKafkaConsumer to parse key separately <K,V> instead of <T>

From what I can tell, with Flink's AVRO deserialization, you can create a stream of Avro-objects and that's fine, but there seems to be an issue, where Flink's kafka consumer only creates streams of single object: FlinkKafkaConsumerBase<T> as opposed to your default Kafka API with its KafkaConsumer.

In my case both Key and Value are separate AVRO-schema-compliant objects and merging their schemas might be a nightmare...

Additionally it seems that with Flink API I can't retrieve ConsumerRecord information?...

Upvotes: 0

Views: 3050

Answers (1)

BrightFlow
BrightFlow

Reputation: 1294

Based on the Flink Kafka Consumer, there is a constructor:

public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
    this(Collections.singletonList(topic), deserializer, props);
}

The second parameter - KeyedDeserializationSchema is used to deserialise Kafka record. It includes message key, message value, offset, topic, etc. So you can implement your own type named MyKafkaRecord as T with Avro key and Avro value in it. Then pass MyKafkaRecord as T to your implementation of KeyedDeserializationSchema. Refer to TypeInformationKeyValueSerializationSchema as an example.

E.g. Reading extra info from Kafka:

class KafkaRecord<K, V> {
  private K key;
  private V value;
  private long offset;
  private int partition;
  private String topic;

  ...
}

class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
  KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
    KafkaRecord<K, V> rec = new KafkaRecord<>();
    rec.key = KEY_DESERIaLISER.deserialize(messageKey);
    rec.value = ...;
    rec.topic = topic;
    ...
  }
}

Upvotes: 2

Related Questions