Darshan Mehta
Darshan Mehta

Reputation: 30819

Perform aggregation in Dataflow

I am storing the (time series) values in Bigtable and I have come across a use case where I need to apply a filter on these values and perform an aggregation. I am using the following configuration to get the connection to Bigtable (to perform range scan etc):

Connection connection = BigtableConfiguration.connect(projectId, instanceId);
Table table = connection.getTable(TableName.valueOf(tableId)); 

table.getScanner(<a scanner with filter>);

This helps me with ResultScanner and I can iterate the rows. However, what I want to do is, perform an aggregation on certain columns and get the values. An SQL equivalent of what I want to do would be this:

SELECT SUM(A), SUM(B)
FROM table
WHERE C = D;

To do the same in HBase, I came across AggregationClient (javadoc here), however, it requires Configuration and I need something that runs off Bigtable (so that I don't need to use the low level Hbase APIs).

I checked the documentation and couldn't find anything (in Java) that could do this. Can anyone share an example to perform aggregation with (non row key or any) filters on BigTable.

Upvotes: 0

Views: 963

Answers (2)

Solomon Duskis
Solomon Duskis

Reputation: 2711

Bigtable does not natively have any aggregation mechanisms. In addition, Bigtable has difficulty processing WHERE C = D, so that type of processing is generally better done on the client side.

AggregationClient is an HBase coprocessor. Cloud Bigtable does not support coprocessors.

If you want to use Cloud Bigtable for this type of aggregation, you'll have to use table.scan() and your own logic. If the scale is large enough, you would have to use Dataflow or BigQuery to perform the aggregations.

Upvotes: 2

ACGray
ACGray

Reputation: 666

Here's one way:

PCollection<TableRow> rows = p.apply(BigQueryIO.readTableRows()
  .fromQuery("SELECT A, B FROM table;"));

PCollection<KV<String, Integer>> valuesA =
  rows.apply(
    MapElements.into(TypeDescriptors.kvs(
      TypeDescriptors.strings(),
      TypeDescriptors.integers()))
      .via((TableRow row) -> KV.of(
        "A", (Integer) row.getF().get(0).getV())));

PCollection<KV<String, Integer>> valuesB =
  rows.apply(
    MapElements.into(TypeDescriptors.kvs(
      TypeDescriptors.strings(),
      TypeDescriptors.integers()))
      .via((TableRow row) -> KV.of(
        "B", (Integer) row.getF().get(1).getV())));

PCollection<KV<String, Integer>> sums =
  PCollectionList.of(sumOfA).and(sumOfB)
    .apply(Flatten.pCollections())
    .apply(Sum.integersPerKey());

Upvotes: 1

Related Questions