Reputation: 15
We are testing with STORAGE_WRITE_API to insert data into BigQuery. We've seen several errors/warnings in our Dataflow pipeline(written in Java). It might work well in the beginning, but eventually the system lag would be increasing, it would stop processing any data from PubSub and the unacked messages piled up.
One common warning is:
Operation ongoing in step insertTableRowsToBigQuery/StorageApiLoads/StorageApiWriteSharded/Write Records for at least 03h35m00s without outputting or completing in state process
at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
at [email protected]/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
at [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
at [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
at [email protected]/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
at app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Callback.await(RetryManager.java:153)
at app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Operation.await(RetryManager.java:136)
at app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:256)
at app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248)
at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:453)
at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Other exceptions we've seen:
Got error io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is closed
Got error io.grpc.StatusRuntimeException: ALREADY_EXIST
PodSandboxStatus of sandbox "..." for pod "df-...-pipeline-...-harness-qw4j_default(...)" error: rpc error: code = Unknown desc = Error: No such container
Code sample:
toBq.apply("insertTableRowsToBigQuery",
BigQueryIO
.writeTableRows()
.to(String.format("%s:%s.%s", PROJECT_ID, DATASET, table))
.withTriggeringFrequency(Duration.standardSeconds(options.getTriggeringFrequency()))
.withNumStorageWriteApiStreams(options.getNumStorageWriteApiStreams())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Upvotes: 0
Views: 791
Reputation: 132
There was a production issue related to connection being stuck after streaming 10MB which has been fixed. If you try again, it should work.
Upvotes: 1