I have spark streaming job and in this some am doing some aggregation, now I want to insert that records into HBase but its not typical insert I want to do UPSERT if for rowkey is available than in column values sum(newvalue+oldvalue) should happen. Does anybody share the pseudo code in java how can I achieve this?
I found the way below are the pseudo code:-
===========For UPSERT(Update and Insert)===========
public void HbaseUpsert(JavaRDD < Row > javaRDD) throws IOException, ServiceException {
JavaPairRDD < ImmutableBytesWritable, Put > hbasePuts1 = javaRDD.mapToPair(
new PairFunction < Row, ImmutableBytesWritable, Put > () {
private static final long serialVersionUID = 1L;
public Tuple2 < ImmutableBytesWritable, Put > call(Row row) throws Exception {
HTable table = new HTable(HbaseConfigurationReader.getInstance().initializeHbaseConfiguration(), "TEST");
try {
String Column1 = row.getString(1);
long Column2 = row.getLong(2);
Get get = new Get(Bytes.toBytes(row.getString(0)));
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"));
Column2 += Bytes.toLong(cell.getValueArray(),cell.getValueOffset());
Put put = new Put(Bytes.toBytes(row.getString(0)));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column1"), Bytes.toBytes(Column1));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"), Bytes.toBytes(Column2));
return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put);
} catch (Exception e) {
finally {
return null;
==============For Configuration===============
public class HbaseConfigurationReader implements Serializable{
static Job newAPIJobConfiguration1 =null;
private static Configuration conf =null;
private static HTable table= null;
private static HbaseConfigurationReader instance= null;
private static Log logger= LogFactory.getLog(HbaseConfigurationReader.class);
HbaseConfigurationReader() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException{
public static HbaseConfigurationReader getInstance() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
if (instance == null) {
instance = new HbaseConfigurationReader();
return instance;
public static Configuration initializeHbaseConfiguration() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("", "2181");
table = new HTable(conf, "TEST");
conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, "TEST");
try {
newAPIJobConfiguration1 = Job.getInstance(conf);
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "TEST");
} catch (IOException e) {
else"Configuration comes null");
return newAPIJobConfiguration1.getConfiguration();
Something like this...
byte[] rowKey = null; // Provided
Table table = null; // Provided
long newValue = 1000; // Provided
byte[] FAMILY = new byte[]{0}; // Defined
byte[] QUALIFIER = new byte[]{1}; // Defined
try {
Get get = new Get(rowKey);
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER);
newValue += Bytes.bytesToLong(cell.getValueArray(),cell.getValueOffset());
Put put = new Put(rowKey);
} catch (Exception e) {
// Handle Exceptions...
We (Splice Machine[Open Source]) have some pretty cool tutorials using Spark Streaming to store data in HBase.
Check it out. might be interesting.
