Reputation: 115
From what I can tell, with Flink's AVRO deserialization, you can create a stream of Avro-objects and that's fine, but there seems to be an issue, where Flink's kafka consumer only creates streams of single object:
FlinkKafkaConsumerBase<T>
as opposed to your default Kafka API with its KafkaConsumer.
In my case both Key and Value are separate AVRO-schema-compliant objects and merging their schemas might be a nightmare...
Additionally it seems that with Flink API I can't retrieve ConsumerRecord information?...
Upvotes: 0
Views: 3050
Reputation: 1294
Based on the Flink Kafka Consumer, there is a constructor:
public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
The second parameter - KeyedDeserializationSchema
is used to deserialise Kafka record. It includes message key, message value, offset, topic, etc. So you can implement your own type named MyKafkaRecord
as T with Avro key and Avro value in it. Then pass MyKafkaRecord
as T
to your implementation of KeyedDeserializationSchema
. Refer to TypeInformationKeyValueSerializationSchema
as an example.
E.g. Reading extra info from Kafka:
class KafkaRecord<K, V> {
private K key;
private V value;
private long offset;
private int partition;
private String topic;
...
}
class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
KafkaRecord<K, V> rec = new KafkaRecord<>();
rec.key = KEY_DESERIaLISER.deserialize(messageKey);
rec.value = ...;
rec.topic = topic;
...
}
}
Upvotes: 2