Reputation: 1094
I need to execute below operations in sequence as given:-
PCollection<String> read = p.apply("Read Lines",TextIO.read().from(options.getInputFile()))
.apply("Get fileName",ParDo.of(new DoFn<String,String>(){
ValueProvider<String> fileReceived = options.getfilename();
@ProcessElement
public void procesElement(ProcessContext c)
{
fileName = fileReceived.get().toString();
LOG.info("File: "+fileName);
}
}));
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read()
.fromQuery("SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'")
.usingStandardSql());
How to accomplish this in Apache Beam/Dataflow?
Upvotes: 0
Views: 656
Reputation: 17913
It seems that you want to apply BigQueryIO.read().fromQuery()
to a query that depends on a value available via a property of type ValueProvider<String>
in your PipelineOptions
, and the provider is not accessible at pipeline construction time - i.e. you are invoking your job via a template.
In that case, the proper solution is to use NestedValueProvider
:
PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(
NestedValueProvider.of(
options.getfilename(),
new SerializableFunction<String, String>() {
@Override
public String apply(String filename) {
return "SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'";
}
})));
Upvotes: 2