Reputation: 1
I'm new to data flow and trying to get schema of table in big query dynamically. Also i need to get the name of destination table dynamically for which i'm using dynamic destination class in BigQueryIO.write.to(). It works if the schema is provided for the destination table before executing the pipeline. But to get the schema dynamically i'm using BigQuery Snippets which takes datasetId and tableId as input and returns schema for a given table. It gives errors mentioned below when tried to run the pipeline with Snippets.
Any help is appreciated. Thanks in advance.
Exception in thread "main" java.lang.NoSuchMethodError: com.google.api.client.googleapis.services.json.AbstractGoogleJsonClient$Builder.setBatchPath(Ljava/lang/String;)Lcom/google/api/client/googleapis/services/AbstractGoogleClient$Builder;
at com.google.api.services.bigquery.Bigquery$Builder.setBatchPath(Bigquery.java:3519)
at com.google.api.services.bigquery.Bigquery$Builder.<init>(Bigquery.java:3498)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryClient(BigQueryServicesImpl.java:881)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$200(BigQueryServicesImpl.java:79)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:388)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:345)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:105)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:676)
at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:640)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:656)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at project2.configTable.main(configTable.java:146)
Code:
package project2;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import java.util.HashMap;
import java.util.Map;
import avro.shaded.com.google.common.collect.ImmutableList;
public class configTable {
public static void main(String[] args) {
// TODO Auto-generated method stub
customInt op=PipelineOptionsFactory.as(customInt.class);
op.setProject("my-new-project");
op.setTempLocation("gs://train-10/projects");
op.setWorkerMachineType("n1-standard-1");
op.setTemplateLocation("gs://train-10/main-template-with-snippets");
op.setRunner(DataflowRunner.class);
org.apache.beam.sdk.Pipeline p=org.apache.beam.sdk.Pipeline.create(op);
PCollection<TableRow> indata=p.apply("Taking side input",BigQueryIO.readTableRows().from("my-new-project:training.config"));
PCollectionView<String> view=indata.apply("Convert to view",ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row=c.element();
c.output(row.get("file").toString());
}
})).apply(View.asSingleton());
PCollection<TableRow> mainop = p.apply("Taking input",TextIO.read().from(NestedValueProvider.of(op.getInputFile(), new SerializableFunction<String, String>() {
public String apply(String input) {
// TODO Auto-generated method stub
return "gs://train-10/projects/"+input;
}
} ))).apply("Transform",ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c ) {
c.output(new TableRow().set("data", c.element()));
}
}));
mainop.apply("Write data",BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
// TODO Auto-generated method stub
String d=sideInput(view);
String tablespec="my-new-project:training."+d;
return tablespec;
}
@Override
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.of(view);
}
@Override
public TableDestination getTable(String destination) {
// TODO Auto-generated method stub
//String dest=String.format("%s:%s.%s","my-new-project","training", destination);
String dest=destination;
return new TableDestination(dest, dest);
}
@Override
public TableSchema getSchema(String destination) {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
com.google.cloud.bigquery.Table table=bigquery.getTable("training", destination);
com.google.cloud.bigquery.Schema tbschema=table.getDefinition().getSchema();
FieldList tfld=tbschema.getFields();
List<TableFieldSchema> flds=new ArrayList<>();
for (Field each : tfld) {
flds.add(new TableFieldSchema().setName(each.getName()).setType(each.getType().toString()));
}
return new TableSchema().setFields(flds);
}
}).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
p.run();
}
}
Upvotes: 0
Views: 625
Reputation: 4166
I don't think you can do both WRITE_TRUNCATE
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE))
and get the table's definition
com.google.cloud.bigquery.Table table=bigquery.getTable("training", destination);
com.google.cloud.bigquery.Schema tbschema=table.getDefinition().getSchema();
Because even if the table exists, it may be recreated when paired with a BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE and at that point, the getTable call will fail. In other words, WRITE_TRUNCATE is not an atomic operation.
I suggest that you have the table (with right schema) created before hand (CREATE_NEVER) or append to the table if it exists (WRITE_EMPTY or WRITE_APPEND) or store the schema outside of the dataflow pipeline and read it in.
Upvotes: 2