rish0097
rish0097

Reputation: 1094

Execute read operations in sequence - Apache Beam

I need to execute below operations in sequence as given:-

 PCollection<String> read = p.apply("Read Lines",TextIO.read().from(options.getInputFile())) 

      .apply("Get fileName",ParDo.of(new DoFn<String,String>(){
          ValueProvider<String> fileReceived = options.getfilename();
          @ProcessElement
          public void procesElement(ProcessContext c)
          {
              fileName = fileReceived.get().toString();
              LOG.info("File: "+fileName);
          }
      }));

      PCollection<TableRow> rows = p.apply("Read from BigQuery",
              BigQueryIO.read()
                  .fromQuery("SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'")
              .usingStandardSql());

How to accomplish this in Apache Beam/Dataflow?

Upvotes: 0

Views: 656

Answers (1)

jkff
jkff

Reputation: 17913

It seems that you want to apply BigQueryIO.read().fromQuery() to a query that depends on a value available via a property of type ValueProvider<String> in your PipelineOptions, and the provider is not accessible at pipeline construction time - i.e. you are invoking your job via a template.

In that case, the proper solution is to use NestedValueProvider:

PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(
    NestedValueProvider.of(
      options.getfilename(),
      new SerializableFunction<String, String>() {
        @Override
        public String apply(String filename) {
          return "SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'";
        }
      })));

Upvotes: 2

Related Questions