Vishwanath T R
Vishwanath T R

Reputation: 35

DataFlow DoFn hangs unexpectedly when reading from bigtable

Our dataflow pipeline has a DoFn that reads from bigtable using the hbase multiget client api. This seems to cause dataflow to stall randomly with the following stack:

Processing stuck in step AttachStuff/BigtableAttacher for at least 04h10m00s without outputting or completing in state process at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) at com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56) at com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback(BatchExecutor.java:276) at com.google.cloud.bigtable.hbase.BatchExecutor.batch(BatchExecutor.java:239) at com.google.cloud.bigtable.hbase.AbstractBigtableTable.get(AbstractBigtableTable.java:241) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors(BigtableAnchorsAttacher.java:86) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.process(BigtableAnchorsAttacher.java:129) at com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling(ScioDoFn.java:39) at com.askscio.google.docbuilder.BigtableAnchorsAttacher$DoFnInvoker.invokeProcessElement(Unknown Source)

We are on beam library 2.12.0. The DoFn inits the bigtable connection in StartBundle.

Each DoFn invocation looks up no more than 10 keys from bigtable

Its single cluster, 3 nodes and SSD. Storage utilization is 2.2 GB, max node CPU utilization is 13% and max read/write rates are 2000 reads/sec and 1000 writes/sec

startBundle:

bigtableConn = BigtableConfiguration.connect(
    config.getString(ConfigKeys.Google.PROJECT_ID),
    config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));

process:

List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
   Get get = new Get(Bytes.toBytes(s))
                     .addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
                        .setMaxVersions(1);
   gets.add(get);
}
Result[] results= fooTable.get(gets);

teardown:

fooTable.close();
bigTableConn.close();

Upvotes: 0

Views: 599

Answers (1)

Igor Bernstein
Igor Bernstein

Reputation: 581

I would recommend moving connection management to @Setup & Teardown and using reference counts in case you are using multi core workers.

Bigtable connections are very heavy weight and are intended to be singleton per process. The HBase connection object returned by BigtableConfiguration.connect() actually wraps a grpc channel pool with 2 channels per cpu, which is very expensive to construct.

You have a few options to improve your pipeline:

  1. set the config option "google.bigtable.use.cached.data.channel.pool" to "true", which will reuse an internal connection pool

  2. Do something like this in your DoFn:

    // instance vars
    static Object connectionLock = new Object();
    static Connection bigtableConn = null;
    
    // @Setup
    synchronized(connectionLock) {
      if (numWorkers++ == 0) {
        bigtableConn = BigtableConfiguration.connect(...);
      } 
    }
    
    // @Teardown
    synchronized(connectionLock) {
      if (--numWorkers == 0) {
        bigtableConn.close();
      } 
    }
    

Upvotes: 1

Related Questions