Reputation: 208
Hi I want to dynamic create table on the fly in Dataflow pipelnie
First, I capture BigQueryIO WriteResult, then use it to create table
WriteResult writeResult =
incomingRecords.apply(
"WriteToBigQuery",
BigQueryIO.<TableRowWithSchema>write()
.to(new DynamicTables())
.withFormatFunction(TableRowWithSchema::getTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
But I still meet table not found Exception
Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table projectId:datasetId.StreamComment",
"reason" : "notFound"
} ],
"message" : "Not found: Table projectId:datasetId.StreamComment",
"status" : "NOT_FOUND"
}
is anything wrong ? thanks
Upvotes: 3
Views: 571
Reputation: 353
You have 2 things wrong:
1) If you want to create tables dynamically you need to use the CREATED_IF_NEEDED
disposition.
withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
2) When creating a table dynamically you need to pass the Schema, using the withSchema
method or the withJsonSchema
method.
That will do the trick!
Upvotes: 3