John David
John David

Reputation: 43

Overwrite some partitions of a partitioned table Bigquery

I am currently trying to develop a Dataflow pipeline in order to replace some partitions of a partitioned table. I have a custom partition field which is a date. The input of my pipeline is a file with potentially different dates.

I developed a Pipeline :

    PipelineOptionsFactory.register(BigQueryOptions.class);
    BigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> rows =  p.apply("ReadLines", TextIO.read().from(options.getFileLocation()))
            .apply("Convert To BQ Row", ParDo.of(new StringToRowConverter(options)));



    ValueProvider<String>  projectId = options.getProjectId();
    ValueProvider<String> datasetId = options.getDatasetId();
    ValueProvider<String> tableId = options.getTableId();
    ValueProvider<String> partitionField = options.getPartitionField();
    ValueProvider<String> columnNames = options.getColumnNames();
    ValueProvider<String> types = options.getTypes();

    rows.apply("Write to BQ", BigQueryIO.writeTableRows()
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withCustomGcsTempLocation(options.getGCSTempLocation())
            .to(new DynamicDestinations<TableRow, String>() {

                @Override
                public String getDestination(ValueInSingleWindow<TableRow> element) {

                    TableRow date = element.getValue();

                    String partitionDestination = (String) date.get(partitionField.get());

                    SimpleDateFormat from = new SimpleDateFormat("yyyy-MM-dd");
                    SimpleDateFormat to = new SimpleDateFormat("yyyyMMdd");

                    try {

                        partitionDestination = to.format(from.parse(partitionDestination));
                        LOG.info("Table destination "+partitionDestination);
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"$"+partitionDestination;

                    } catch(ParseException e){
                        e.printStackTrace();
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"_rowsWithErrors";
                    }
                }

                @Override
                public TableDestination getTable(String destination) {

                    TimePartitioning timePartitioning = new TimePartitioning();
                    timePartitioning.setField(partitionField.get());
                    timePartitioning.setType("DAY");
                    timePartitioning.setRequirePartitionFilter(true);

                    TableDestination tableDestination  = new TableDestination(destination, null, timePartitioning);

                    LOG.info(tableDestination.toString());

                    return tableDestination;

                }

                @Override
                public TableSchema getSchema(String destination) {

                        return new TableSchema().setFields(buildTableSchemaFromOptions(columnNames, types));
                }
            })
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    p.run();
}

When I trigger the pipeline locally, it successfully replacesthe partitions which date are in the input file. Nevertheless, when deploying on Google Cloud Dataflow and running the template with the exact same parameters, it truncates all the data, and I just have at the end the file I wanted to upload in my table.

Do you know why there is such a difference ?

Thank you !

Upvotes: 4

Views: 1883

Answers (1)

Kevin Chien
Kevin Chien

Reputation: 99

You specified BigQueryIO.Write.CreateDisposition to CREATE_IF_NEEDED, and this is paired with BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE, so even if the table exists, it may be recreated. This is the reason why you see your table getting replaced.

See this document [1] for details.

[1] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write.CreateDisposition#CREATE_IF_NEEDED

Upvotes: 2

Related Questions