Reputation: 35
Our dataflow pipeline has a DoFn that reads from bigtable using the hbase multiget client api. This seems to cause dataflow to stall randomly with the following stack:
Processing stuck in step AttachStuff/BigtableAttacher for at least 04h10m00s without outputting or completing in state process at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) at com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56) at com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback(BatchExecutor.java:276) at com.google.cloud.bigtable.hbase.BatchExecutor.batch(BatchExecutor.java:239) at com.google.cloud.bigtable.hbase.AbstractBigtableTable.get(AbstractBigtableTable.java:241) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors(BigtableAnchorsAttacher.java:86) at com.askscio.google.docbuilder.BigtableAnchorsAttacher.process(BigtableAnchorsAttacher.java:129) at com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling(ScioDoFn.java:39) at com.askscio.google.docbuilder.BigtableAnchorsAttacher$DoFnInvoker.invokeProcessElement(Unknown Source)
We are on beam library 2.12.0. The DoFn inits the bigtable connection in StartBundle.
Each DoFn invocation looks up no more than 10 keys from bigtable
Its single cluster, 3 nodes and SSD. Storage utilization is 2.2 GB, max node CPU utilization is 13% and max read/write rates are 2000 reads/sec and 1000 writes/sec
startBundle:
bigtableConn = BigtableConfiguration.connect(
config.getString(ConfigKeys.Google.PROJECT_ID),
config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));
process:
List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
Get get = new Get(Bytes.toBytes(s))
.addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
.setMaxVersions(1);
gets.add(get);
}
Result[] results= fooTable.get(gets);
teardown:
fooTable.close();
bigTableConn.close();
Upvotes: 0
Views: 599
Reputation: 581
I would recommend moving connection management to @Setup & Teardown and using reference counts in case you are using multi core workers.
Bigtable connections are very heavy weight and are intended to be singleton per process. The HBase connection object returned by BigtableConfiguration.connect() actually wraps a grpc channel pool with 2 channels per cpu, which is very expensive to construct.
You have a few options to improve your pipeline:
set the config option "google.bigtable.use.cached.data.channel.pool" to "true", which will reuse an internal connection pool
Do something like this in your DoFn:
// instance vars
static Object connectionLock = new Object();
static Connection bigtableConn = null;
// @Setup
synchronized(connectionLock) {
if (numWorkers++ == 0) {
bigtableConn = BigtableConfiguration.connect(...);
}
}
// @Teardown
synchronized(connectionLock) {
if (--numWorkers == 0) {
bigtableConn.close();
}
}
Upvotes: 1