Reputation: 305
The Flink consumer application I am developing reads from multiple Kafka topics. The messages published in the different topics adhere to the same schema (formatted as Avro). For schema management, I am using the Confluent Schema Registry.
I have been using the following snippet for the KafkaSource and it works just fine.
KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(TOPIC-1, TOPIC-2)
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL))
.build();
Now, I want to determine the topic-name for each message that I process. Since the current deserializer is ValueOnly, I started looking into the setDeserializer() method which I felt would give me access to the whole ConsumerRecord object and I can fetch the topic-name from that.
However, I am unable to figure out how to use that implementation. Should I implement my own deserializer? If so, how does the Schema registry fit into that implementation?
Upvotes: 2
Views: 3792
Reputation: 305
I took inspiration from the above answer (by David) and added the following custom deserializer -
KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(TOPIC-1, TOPIC-2)
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchema<Event>{
DeserializationSchema deserialzationSchema = ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL);
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
Event event = new Event();
event.setTopicName(record.topic());
event.setMyObject((MyObject) deserializationSchema.deserialize(record.value()));
return event;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(Event.class);
}
})).build();
The Event class is a wrapper over the MyObject class with additional field for storing the topic name.
Upvotes: 1
Reputation: 43499
You can use the setDeserializer
method with a KafkaRecordDeserializationSchema
that might look something like this:
public class KafkaUsageRecordDeserializationSchema
implements KafkaRecordDeserializationSchema<UsageRecord> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
KafkaRecordDeserializationSchema.super.open(context);
objectMapper = JsonMapper.builder().build();
}
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> consumerRecord,
Collector<UsageRecord> collector) throws IOException {
collector.collect(objectMapper.readValue(consumerRecord.value(), UsageRecord.class));
}
@Override
public TypeInformation<UsageRecord> getProducedType() {
return TypeInformation.of(UsageRecord.class);
}
}
Then you can use the ConsumerRecord
to access the topic and other metadata.
Upvotes: 2