rish0097
rish0097

Reputation: 1094

Write a particular PCollection to BigQuery

Suppose I create two output PCollections as a result of SideOutputs and depending on some condition I want to write only one of them to BigQuery. How to do this?

Basically my use case is that I'm trying to make Write_Append and Write_Truncate dynamic. I fetch the information(append/truncate) from a config table that I maintain in BigQuery. So depending on what I have in the config table I must apply Truncate or Append.

So using SideOutputs I was able to create two PCollections(Append and Truncate respectively) out of which one will be empty. And the one which has all the rows must be written to BigQuery. Is this approach correct?

The code that i'm using:

 final TupleTag<TableRow> truncate =
                  new TupleTag<TableRow>(){};
              // Output that contains word lengths.
              final TupleTag<TableRow> append =
                  new TupleTag<TableRow>(){};

              PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){
              @ProcessElement
              public void processElement(ProcessContext c)
              {
                  String value = c.sideInput(configView).get(0).toString();
                  LOG.info("config: "+value);
                  if(value.equals("truncate")){
                      LOG.info("outputting to truncate");
                      c.output(new TableRow().set("color", c.element()));
                  }
                  else
                  {
                      LOG.info("outputting to append");
                      c.output(append,new TableRow().set("color", c.element()));
                  }
                  //c.output(new TableRow().set("color", c.element()));
              }
          }).withSideInputs(configView).withOutputTags(truncate,
                  TupleTagList.of(append)));

              results.get(truncate).apply("truncate",BigQueryIO.writeTableRows()
                        .to("projectid:datasetid.tableid")
                        .withSchema(schema)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

              results.get(append).apply("append",BigQueryIO.writeTableRows()
                        .to("projectid:datasetid.tableid")
                        .withSchema(schema)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

I need to perform one out of the two. If I do both table is going to get truncated anyways.

P.S. I'm using Java SDK (Apache Beam 2.1)

Upvotes: 0

Views: 869

Answers (1)

jkff
jkff

Reputation: 17913

I believe you are right that, if your pipeline includes at all a write to a BigQuery table with WRITE_TRUNCATE, currently the table will get truncated even if there's no data. Feel free to file a JIRA to support more configurable behavior in this case.

So if you want it to conditionally not get truncated, you need to conditionally not include that write transform at all. Is there a way to push the condition to that level, or does the condition actually have to be computed from other data in the pipeline?

(the only workaround I can think of is to use DynamicDestinations to dynamically choose the name of the table to truncate, and truncate some other dummy empty table instead - I can elaborate on this more after your answer to the previous paragraph)

Upvotes: 0

Related Questions