Reputation: 423
I would like to read couple of rows from Kafka topic and create a avro file.
I have the partial code working which is reading from kafka topic and printing to console works.
what I would like to know how to use the avroIO to write the generic record to the avro file .
Listed below is the code that reads from the kafka topic and prints to console
public class BeamConsumer {
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
Schema schema =
new Schema.Parser()
.parse(new File("schema.avsc"));
PTransform<PBegin, PCollection<KafkaRecord<GenericRecord, GenericRecord>>> input =
KafkaIO.<GenericRecord, GenericRecord>read()
.withBootstrapServers(
"${kafkaserveraddress}")
.withTopic("my-topic") // use
// withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
"${schemaregistryaddress}",
"schemaregistrysubjectkey"))
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
"${schemaregistryaddress}",
"schemaregistrysubjectvalue"))
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.GROUP_ID_CONFIG,
"my-group-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SSL",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
"******",
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
"/keystore.jks",
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
"*******",
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
"*******",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest",
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
600000));
pipeline
.apply(input)
.apply(
"ExtractRecord",
ParDo.of(
new DoFn<
KafkaRecord<GenericRecord, GenericRecord>,
KafkaRecord<GenericRecord, GenericRecord>>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord<GenericRecord, GenericRecord> record =
(KafkaRecord<GenericRecord, GenericRecord>) c.element();
KV<GenericRecord, GenericRecord> log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(record);
}
}));
// .apply("WriteToAvro",
// AvroIO.writeGenericRecords(schema).to("/Users/mjain34/code/avroutils/src/main/resources/file.avro"));
//
PipelineResult run = pipeline.run();
run.waitUntilFinish(Duration.standardSeconds(1000));
}
}
Note: I have modified the configs values in here to hide private information
Upvotes: 0
Views: 180