c1mone
c1mone

Reputation: 208

Use Dataflow failed insert WriteResult to handle table not found exception

Hi I want to dynamic create table on the fly in Dataflow pipelnie

First, I capture BigQueryIO WriteResult, then use it to create table

        WriteResult writeResult =
            incomingRecords.apply(
                    "WriteToBigQuery",
                    BigQueryIO.<TableRowWithSchema>write()
                            .to(new DynamicTables())
                            .withFormatFunction(TableRowWithSchema::getTableRow)
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

But I still meet table not found Exception

Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException:     com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
  "domain" : "global",
​  "message" : "Not found: Table projectId:datasetId.StreamComment",
  "reason" : "notFound"
} ],
  "message" : "Not found: Table projectId:datasetId.StreamComment",
  "status" : "NOT_FOUND"
}

is anything wrong ? thanks

Upvotes: 3

Views: 571

Answers (1)

Juan Urrego
Juan Urrego

Reputation: 353

You have 2 things wrong:

1) If you want to create tables dynamically you need to use the CREATED_IF_NEEDED disposition.

withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)

2) When creating a table dynamically you need to pass the Schema, using the withSchema method or the withJsonSchema method.

That will do the trick!

Upvotes: 3

Related Questions