Adam Brocklehurst
Adam Brocklehurst

Reputation: 513

Writing to BigQuery from Cloud Dataflow: Unable to create a side-input view from input

I'm trying to write a datastore flow that reads in a stream for pub sub and writes in into big query.

When trying to run the tool I get the error " Unable to create a side-input view from input" with the stack trace:

Exception in thread "main" java.lang.IllegalStateException: Unable to create a side-input view from input
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:277)
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:268)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:366)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:214)
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:79)
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:68)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1738)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1440)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at co.uk.bubblestudent.dataflow.StarterPipeline.main(StarterPipeline.java:116)
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
at com.google.cloud.dataflow.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:192)
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:275)
... 20 more

My code is:

public class StarterPipeline {


public static final Duration ONE_DAY = Duration.standardDays(1);
public static final Duration ONE_HOUR = Duration.standardHours(1);
public static final Duration TEN_SECONDS = Duration.standardSeconds(10);
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  private static TableSchema schemaGen() {
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("facebookID").setType("STRING"));
    fields.add(new TableFieldSchema().setName("propertyID").setType("STRING"));
    fields.add(new TableFieldSchema().setName("time").setType("TIMESTAMP"));
    TableSchema schema = new TableSchema().setFields(fields);
    return schema;
  }

  public static void main(String[] args) {
  LOG.info("Starting");
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  LOG.info("Pipeline made");
  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject(<project>);
  options.setStagingLocation(<bucket>);
  options.setTempLocation(<bucket>);
  Pipeline p = Pipeline.create(options);


  TableSchema schema = schemaGen();
  LOG.info("Schema made");
  try {
    LOG.info(schema.toPrettyString());
} catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
  PCollection<String> input = p.apply(PubsubIO.Read.named("ReadFromPubsub").subscription(<subscription>));

  PCollection<TableRow> pardo = input.apply(ParDo.of(new FormatAsTableRowFn()));
  LOG.info("Formatted Row");

  pardo.apply(BigQueryIO.Write.named("Write into BigQuery").to(<table>)
       .withSchema(schema)
       .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
  LOG.info("about to run");
  p.run();

  }


  static class FormatAsTableRowFn extends DoFn<String, TableRow> {
    @Override
    public void processElement(ProcessContext c) {
        LOG.info("Formatting");
         String json = c.element();

         //HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType());

        // Make a BigQuery row from the JSON object:
        TableRow row = new TableRow()
            .set("facebookID","324234")
            .set("properttyID", "23423")
            .set("time", "12312313123");


       /*
        *     TableRow row = new TableRow()
            .set("facebookID", items.get("facbookID"))
            .set("properttyID", items.get("propertyID"))
            .set("time", items.get("time"));
        */
        c.output(row);
    }
  }
}

Any suggestions on what this might be?

Upvotes: 1

Views: 1191

Answers (1)

danielm
danielm

Reputation: 3010

The default implementation of BigQueryIO only works over bounded PCollections, and PubsubIO.Read produces an unbounded PCollection.

There are two ways to fix this: you can bound the input by calling maxReadTime or maxNumElements on your PubsubIO transform, or you can use the streaming insert type of BigQueryIO by calling setStreaming(true) on your options.

Upvotes: 1

Related Questions