PRASAD KEDARI
PRASAD KEDARI

Reputation: 55

Partitioned table loading using DataFlow job

I want to read the file and need to write it to BigQuery Partitioned table, based on the date value present in the field of the file. e.g. If file contains 2 dates 25 and 26 July then DataFlow should write that data to 2 partitions based on the data present in the file.

public class StarterPipeline {
  private static final Logger LOG =
      LoggerFactory.getLogger(StarterPipeline.class);

  public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setProject("");
    options.setTempLocation("gs://stage_location/");
    Pipeline p = Pipeline.create(options);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("id").setType("STRING"));
    fields.add(new TableFieldSchema().setName("name").setType("STRING"));
    fields.add(new TableFieldSchema().setName("designation").setType("STRING"));
    fields.add(new TableFieldSchema().setName("joindate").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt"));

    PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        String[] data = c.element().split(",");

        c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3]));
      }
    }));


    rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
      public String getDate(String value) {
        return "project:dataset.DataFlow_Test$"+value;
      }

      @Override
      public TableDestination apply(ValueInSingleWindow<TableRow> value) {
        TableRow row = value.getValue();
        String date = getDate(row.get("joindate").toString());
        String tableSpec = date;
        String tableDescription = "";
        return new TableDestination(tableSpec, tableDescription);
      }
    }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
      @Override
      public TableRow apply(TableRow input) {
        // TODO Auto-generated method stub
        return input;
      }
    }).withSchema(schema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

    p.run();
  }
}

while running above program I am getting below error: Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format: Caused by: java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format. Let me know if there are any recommendations

Upvotes: 1

Views: 1227

Answers (2)

PRASAD KEDARI
PRASAD KEDARI

Reputation: 55

I am able to load data into partition table based on date present in the data using below code:

       rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
         @Override
         public TableDestination apply(ValueInSingleWindow<TableRow> value) {
           TableRow row = value.getValue();
           TableReference reference = new TableReference();
           reference.setProjectId("");
           reference.setDatasetId("");

           reference.setTableId("tabelname$" + row.get("datefield").toString());
           return new TableDestination(reference, null);
         }
       }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
         @Override
         public TableRow apply(TableRow input) {
            LOG.info("format function:"+input.toString());
           return input;
         }
       }).withSchema(schema)
           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
           .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Upvotes: 1

Ben Chambers
Ben Chambers

Reputation: 6130

Beam does not currently support date partitioned tables. See BEAM-2390 for the issue tracking this feature.

Upvotes: 1

Related Questions