RadhaKrishna
RadhaKrishna

Reputation: 312

HBase update the existing row

I have my log data in hbase in the following format.

hbase source table

---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1

dest table(In the destination table,the word is added as key and the sum of the count as column.data after running mapreduce on 2013/09/25)

------------------
word(table key) count
------------------
apple 7
oranges 6
mangoes 6

The data will be added to source table every day.but i dont want to do map reduce for all the source table data. so i tried doing map reduce only for the data added on that day.

source table with new data added on 2013/09/26.

---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
2013/09/26 apple 10
2013/09/26 oranges 20

when i do mapreduce only for the 2013/09/26 data.i am getting the following in the dest table.

dest table with new data(since the keys are same,the count for apple and oranges are updated with the 2013/09/26 data.old data up to 2013/09/25 is gone):

------------------
word(table key) count
------------------
apple 10
oranges 10
mangoes 6 

expected dest table:

------------------
word(table key) count
------------------
apple 17
oranges 16
mangoes 6 

Can i map reduce partial data and add the count to the dest table count column or do i need to map reduce all the data every time ?

if i can map reduce partial data and update the count,how can i do it.here is my map reduce function.

Map function:

public void map(ImmutableBytesWritable row,Result value,Context context) throws IOException {
    ImmutableBytesWritable key = new  ImmutableBytesWritable(row.get());
    String cf = "data";
    String column1 = "word";
    String column2 = "count";
    String word   = new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column1)));
    Text t = new Text(word);
    context.write(t,value); 

}

Reduce function:

public void reduce(Text key,Iterable<Result> values,Context context) throws IOException,InterruptedException {
    int count=0;
    String cf = "data";
    String column = "count";
    for(Result val :values) {
        int d = Integer.parseInt(new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column))))
        count += d;
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(cf.getBytes(), column.getBytes(), String.valueOf(count).getBytes());
    context.write(null, put);
}    

Upvotes: 0

Views: 5117

Answers (2)

Arnon Rotem-Gal-Oz
Arnon Rotem-Gal-Oz

Reputation: 25909

When using HBase you can treat a column as a counter. which you can Increment or incrementColumnValue to. The nice feature about it is that each increment is atomic so you cna increment from multiple sources (maps) concurrently and the total will be correct.

To use that in a map (or reduce) you need to write to HBase by yourself and not though the context - you can open the table in the setup method and close (or even increment the total) in the cleanup

Upvotes: 1

Tariq
Tariq

Reputation: 34184

Data is not gone anywhere. Since you are putting the data in the same cell it is going as a new version. When you scan the table you see only the latest version, by default. You need to write the logic to add the new count to the previous count and then insert the final value into the table.

If you do not want to keep multiple version you need to handle it yourself by deleting the older version before you put the final count into the table.

Upvotes: 0

Related Questions