Kodin
Kodin

Reputation: 190

GCloud Dataflow recreate BigQuery table if it gets deleted during job run

I have set up a GCloud Dataflow pipeline which consumes messages from a Pub/Sub subscription, converts them to table rows and writes those rows to a corresponding BigQuery table.

Table destination is decided based on the contents of the Pub/Sub message and will occasionally lead to the situation that a table does not exist yet and has to be created first. For this I use create disposition CREATE_IF_NEEDED, which works great.

However, I have noticed that if I manually delete a newly created table in BigQuery while the Dataflow job is still running, Dataflow will get stuck and will not recreate the table. Instead I get an error:

Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
    java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
    java.util.concurrent.FutureTask.get(FutureTask.java:191) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)

If I go back to BigQuery and manually recreate this table, Dataflow job will continue working.

However, I am wondering if there is a way to instruct the Dataflow pipeline to recreate the table if it gets deleted during the job run?

GCloud Dataflow pipeline

Upvotes: 0

Views: 815

Answers (1)

Jayadeep Jayaraman
Jayadeep Jayaraman

Reputation: 2825

This is not possible in the current BigqueryIO connector. From the github link of the connector present here you will observe that for StreamingWriteFn which is what your code, the table creation process is done in getOrCreateTable and this is called in finishBundle. There is a map of createdTables that is maintained and in finishBundle the table gets created if it not is already present, once it is present and stored in the hashmap it is not re-created as shown below:-

    public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
        throws IOException {
      TableReference tableReference = parseTableSpec(tableSpec);
      if (!createdTables.contains(tableSpec)) {
        synchronized (createdTables) {
          // Another thread may have succeeded in creating the table in the meanwhile, so
          // check again. This check isn't needed for correctness, but we add it to prevent
          // every thread from attempting a create and overwhelming our BigQuery quota.
          if (!createdTables.contains(tableSpec)) {
            TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
            Bigquery client = Transport.newBigQueryClient(options).build();
            BigQueryTableInserter inserter = new BigQueryTableInserter(client);
            inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
                CreateDisposition.CREATE_IF_NEEDED, tableSchema);
            createdTables.add(tableSpec);
          }
        }
      }
      return tableReference;
    }

For you to meet your requirement you might have to maintain your own BigqueryIO wherein you don't perform this specific check

if (!createdTables.contains(tableSpec)) {

The more important question though is why would the table get deleted in a production system by itself? This problem should be fixed rather than trying to re-create the table from Dataflow.

Upvotes: 1

Related Questions