vkt
vkt

Reputation: 1459

Send Big query table rows to Kafka avro message using apache beam

I need to publish the Big query table rows to Kafka in Avro format.

PCollection<TableRow> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table))
    
//How to convert rows to avro format?

rows.apply(KafkaIO.<Long, ???>write()
                .withBootstrapServers("kafka:29092")
                .withTopic("test")
                .withValueSerializer(KafkaAvorSerializer.class)
        );

How to convert TableRow to Avro format?

Upvotes: 0

Views: 238

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

Use MapElements

rows.apply(MapElements.via(new SimpleFunction<Tabelrows, GenericRecord>() {
  @Override
  public GenericRecord apply(Tabelrows input) {
    log.info("Parsing {} to Avro", input);
    return null; // TODO: Replace with Avro object
  }
});

If Tabelrows is a collection-type that you want to convert to many records, you can use FlatMapElements instead.

As for writing to Kafka, I wrote a simple example

Upvotes: 1

Related Questions