davyjones
davyjones

Reputation: 305

Custom avro message deserialization with Flink

The Flink consumer application I am developing reads from multiple Kafka topics. The messages published in the different topics adhere to the same schema (formatted as Avro). For schema management, I am using the Confluent Schema Registry.

I have been using the following snippet for the KafkaSource and it works just fine.

KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
                .setBootstrapServers(BOOTSTRAP_SERVERS)
                .setTopics(TOPIC-1, TOPIC-2)
                .setGroupId(GROUP_ID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL))
                .build();

Now, I want to determine the topic-name for each message that I process. Since the current deserializer is ValueOnly, I started looking into the setDeserializer() method which I felt would give me access to the whole ConsumerRecord object and I can fetch the topic-name from that.

However, I am unable to figure out how to use that implementation. Should I implement my own deserializer? If so, how does the Schema registry fit into that implementation?

Upvotes: 2

Views: 3792

Answers (2)

davyjones
davyjones

Reputation: 305

I took inspiration from the above answer (by David) and added the following custom deserializer -

KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
          .setBootstrapServers(BOOTSTRAP_SERVERS)
          .setTopics(TOPIC-1, TOPIC-2)
          .setGroupId(GROUP_ID)
          .setStartingOffsets(OffsetsInitializer.earliest())
          .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchema<Event>{                                                          
    
           DeserializationSchema deserialzationSchema = ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL);

           @Override
           public boolean isEndOfStream(Event nextElement) {
              return false;
           }
        
           @Override
           public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
              Event event = new Event();
              event.setTopicName(record.topic());
              event.setMyObject((MyObject) deserializationSchema.deserialize(record.value()));
              return event;
           }
        
           @Override
           public TypeInformation<String> getProducedType() {
              return TypeInformation.of(Event.class);
           }
           })).build();

The Event class is a wrapper over the MyObject class with additional field for storing the topic name.

Upvotes: 1

David Anderson
David Anderson

Reputation: 43499

You can use the setDeserializer method with a KafkaRecordDeserializationSchema that might look something like this:

public class KafkaUsageRecordDeserializationSchema
        implements KafkaRecordDeserializationSchema<UsageRecord> {

    private static final long serialVersionUID = 1L;

    private transient ObjectMapper objectMapper;

    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        KafkaRecordDeserializationSchema.super.open(context);
        objectMapper = JsonMapper.builder().build();
    }

    @Override
    public void deserialize(
            ConsumerRecord<byte[], byte[]> consumerRecord,
            Collector<UsageRecord> collector) throws IOException {

        collector.collect(objectMapper.readValue(consumerRecord.value(), UsageRecord.class));
    }

    @Override
    public TypeInformation<UsageRecord> getProducedType() {
        return TypeInformation.of(UsageRecord.class);
    }
}

Then you can use the ConsumerRecord to access the topic and other metadata.

Upvotes: 2

Related Questions