Reputation: 30819
I am storing the (time series) values in Bigtable
and I have come across a use case where I need to apply a filter
on these values and perform an aggregation. I am using the following configuration to get the connection
to Bigtable
(to perform range scan etc):
Connection connection = BigtableConfiguration.connect(projectId, instanceId);
Table table = connection.getTable(TableName.valueOf(tableId));
table.getScanner(<a scanner with filter>);
This helps me with ResultScanner
and I can iterate the rows. However, what I want to do is, perform an aggregation on certain columns and get the values. An SQL equivalent of what I want to do would be this:
SELECT SUM(A), SUM(B)
FROM table
WHERE C = D;
To do the same in HBase
, I came across AggregationClient
(javadoc here), however, it requires Configuration
and I need something that runs off Bigtable
(so that I don't need to use the low level Hbase APIs).
I checked the documentation and couldn't find anything (in Java) that could do this. Can anyone share an example to perform aggregation
with (non row key or any) filters on BigTable.
Upvotes: 0
Views: 963
Reputation: 2711
Bigtable does not natively have any aggregation mechanisms. In addition, Bigtable has difficulty processing WHERE C = D
, so that type of processing is generally better done on the client side.
AggregationClient
is an HBase coprocessor. Cloud Bigtable does not support coprocessors.
If you want to use Cloud Bigtable for this type of aggregation, you'll have to use table.scan()
and your own logic. If the scale is large enough, you would have to use Dataflow or BigQuery to perform the aggregations.
Upvotes: 2
Reputation: 666
Here's one way:
PCollection<TableRow> rows = p.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT A, B FROM table;"));
PCollection<KV<String, Integer>> valuesA =
rows.apply(
MapElements.into(TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.integers()))
.via((TableRow row) -> KV.of(
"A", (Integer) row.getF().get(0).getV())));
PCollection<KV<String, Integer>> valuesB =
rows.apply(
MapElements.into(TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.integers()))
.via((TableRow row) -> KV.of(
"B", (Integer) row.getF().get(1).getV())));
PCollection<KV<String, Integer>> sums =
PCollectionList.of(sumOfA).and(sumOfB)
.apply(Flatten.pCollections())
.apply(Sum.integersPerKey());
Upvotes: 1