Reputation: 190
I have set up a GCloud Dataflow pipeline which consumes messages from a Pub/Sub subscription, converts them to table rows and writes those rows to a corresponding BigQuery table.
Table destination is decided based on the contents of the Pub/Sub message and will occasionally lead to the situation that a table does not exist yet and has to be created first. For this I use create disposition CREATE_IF_NEEDED
, which works great.
However, I have noticed that if I manually delete a newly created table in BigQuery while the Dataflow job is still running, Dataflow will get stuck and will not recreate the table. Instead I get an error:
Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
java.util.concurrent.FutureTask.get(FutureTask.java:191) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
If I go back to BigQuery and manually recreate this table, Dataflow job will continue working.
However, I am wondering if there is a way to instruct the Dataflow pipeline to recreate the table if it gets deleted during the job run?
Upvotes: 0
Views: 815
Reputation: 2825
This is not possible in the current BigqueryIO
connector. From the github link of the connector present here you will observe that for StreamingWriteFn
which is what your code, the table creation process is done in getOrCreateTable
and this is called in finishBundle
. There is a map of createdTables
that is maintained and in finishBundle
the table gets created if it not is already present, once it is present and stored in the hashmap it is not re-created as shown below:-
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
}
}
}
return tableReference;
}
For you to meet your requirement you might have to maintain your own BigqueryIO wherein you don't perform this specific check
if (!createdTables.contains(tableSpec)) {
The more important question though is why would the table get deleted in a production system by itself? This problem should be fixed rather than trying to re-create the table from Dataflow.
Upvotes: 1