artofdoe
artofdoe

Reputation: 197

How to infer avro schema from a kafka topic in Apache Beam KafkaIO

I'm using Apache Beam's kafkaIO to read from a topic that has an avro schema in Confluent schema registry. I'm able to deserialize the message and write to files. But ultimately i want to write to BigQuery. My pipeline isn't able to infer the schema. How do I extract/infer the schema and attach it to the data in the pipeline so that my downstream processes (write to BigQuery) can infer the schema?

Here is the code where I use the schema registry url to set the deserializer and where i read from Kafka:

    consumerConfig.put(
                        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
                        options.getSchemaRegistryUrl());

String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();

ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
            ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);

pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers(options.getKafkaBrokers().get())
                        .withTopics(Utils.getListFromString(options.getKafkaTopics()))
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(valDeserializerProvider)
                        .withKeyDeserializer(ByteArrayDeserializer.class)

                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        );

I initially thought that this would be enough for beam to infer the schema, but it does not since hasSchema() returns false.

Any help would be appreciated.

Upvotes: 0

Views: 1217

Answers (2)

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

This code probably will work but I have not tested yet.

// Fetch Avro schema from CSR
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient("schema_registry_url", 10);
SchemaMetadata latestSchemaMetadata = registryClient.getLatestSchemaMetadata("schema_name");
Schema avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);


// Create KafkaIO.Read with Avro schema deserializer
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
    .withBootstrapServers("host:port")
    .withTopic("topic_name")
    .withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", schemaRegistryUrl))
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));

// Apply Kafka.Read and set Beam schema based on Avro Schema
p.apply(read)
 .apply(Values.<GenericRecord>create()).setSchema(schema,
    AvroUtils.getToRowFunction(GenericRecord.class, avroSchema),
    AvroUtils.getFromRowFunction(GenericRecord.class))

Then I think you can use BigQueryIO.Write with useBeamSchema().

Upvotes: 0

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

There is ongoing work to support inferring of Avro schema, stored in Confluent Schema Registry, in KafkaIO. Though, it's possible to do now in user pipeline code as well.

Upvotes: 1

Related Questions