developer2015
developer2015

Reputation: 423

Apache Beam write kafka Records to Avro File

I would like to read couple of rows from Kafka topic and create a avro file.

I have the partial code working which is reading from kafka topic and printing to console works.

what I would like to know how to use the avroIO to write the generic record to the avro file .

Listed below is the code that reads from the kafka topic and prints to console




public class BeamConsumer {

    public static void main(String[] args) throws IOException {

        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
        Schema schema =
                new Schema.Parser()
                        .parse(new File("schema.avsc"));

        PTransform<PBegin, PCollection<KafkaRecord<GenericRecord, GenericRecord>>> input =
                KafkaIO.<GenericRecord, GenericRecord>read()
                        .withBootstrapServers(
                                "${kafkaserveraddress}")
                        .withTopic("my-topic") // use
                        // withTopics(List<String>) to read from multiple topics.
                        .withKeyDeserializer(
                                ConfluentSchemaRegistryDeserializerProvider.of(
                                        "${schemaregistryaddress}",
                                        "schemaregistrysubjectkey"))
                        .withValueDeserializer(
                                ConfluentSchemaRegistryDeserializerProvider.of(
                                        "${schemaregistryaddress}",
                                        "schemaregistrysubjectvalue"))
                        .withConsumerConfigUpdates(
                                ImmutableMap.of(
                                        ConsumerConfig.GROUP_ID_CONFIG,
                                        "my-group-id",
                                        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                        "SSL",
                                        SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                                        "/truststore.jks",
                                        SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
                                        "******",
                                        SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
                                        "/keystore.jks",
                                        SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
                                        "*******",
                                        SslConfigs.SSL_KEY_PASSWORD_CONFIG,
                                        "*******",
                                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                                        "latest",
                                        ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
                                        600000));

        pipeline
                .apply(input)
                .apply(
                        "ExtractRecord",
                        ParDo.of(
                                new DoFn<
                                        KafkaRecord<GenericRecord, GenericRecord>,
                                        KafkaRecord<GenericRecord, GenericRecord>>() {
                                    @DoFn.ProcessElement
                                    public void processElement(ProcessContext c) {
                                        KafkaRecord<GenericRecord, GenericRecord> record =
                                                (KafkaRecord<GenericRecord, GenericRecord>) c.element();
                                        KV<GenericRecord, GenericRecord> log = record.getKV();
                                        System.out.println("Key Obtained: " + log.getKey());
                                        System.out.println("Value Obtained: " + log.getValue().toString());
                                        c.output(record);
                                    }
                                }));
        //            .apply("WriteToAvro",
        // AvroIO.writeGenericRecords(schema).to("/Users/mjain34/code/avroutils/src/main/resources/file.avro"));
        //
        PipelineResult run = pipeline.run();
        run.waitUntilFinish(Duration.standardSeconds(1000));
    }
}

Note: I have modified the configs values in here to hide private information

Upvotes: 0

Views: 180

Answers (0)

Related Questions