Reputation: 20319
Currently I have two Hbase tables (lets call them tableA
and tableB
). Using a single stage MapReduce job the data in tableA
is read processed and saved to tableB
. Currently both tables reside on the same HBase cluster. However, I need to relocate tableB
to its on cluster.
Is it possible to configure a single stage map reduce job in Hadoop to read and write from separate instances of HBase?
Upvotes: 9
Views: 615
Reputation: 3067
It is possible, HBase's CopyTable MapReduce job does it by using TableMapReduceUtil.initTableReducerJob()
which allows you to set an alternative quorumAddress in case you need to write to remote clusters:
public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, org.apache.hadoop.mapreduce.Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)
quorumAddress - Distant cluster to write to; default is null for output to the cluster that is designated in hbase-site.xml. Set this String to the zookeeper ensemble of an alternate remote cluster when you would have the reduce write a cluster that is other than the default; e.g. copying tables between clusters, the source would be designated by hbase-site.xml and this param would have the ensemble address of the remote cluster. The format to pass is particular. Pass :: such as server,server2,server3:2181:/hbase.
Another option is to implement your own custom reducer to write to the remote table instead of writing to the context. Something similar to this:
public static class MyReducer extends Reducer<Text, Result, Text, Text> {
protected Table remoteTable;
protected Connection connection;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
// Clone configuration and provide a new quorum address for the remote cluster
Configuration config = HBaseConfiguration.create(context.getConfiguration());
config.set("hbase.zookeeper.quorum","quorum1,quorum2,quorum3");
connection = ConnectionFactory.createConnection(config); // HBase 0.99+
//connection = HConnectionManager.createConnection(config); // HBase <0.99
remoteTable = connection.getTable("myTable".getBytes());
remoteTable.setAutoFlush(false);
remoteTable.setWriteBufferSize(1024L*1024L*10L); // 10MB buffer
}
public void reduce(Text boardKey, Iterable<Result> results, Context context) throws IOException, InterruptedException {
/* Write puts to remoteTable */
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
if (remoteTable!=null) {
remoteTable.flushCommits();
remoteTable.close();
}
if(connection!=null) {
connection.close();
}
}
}
Upvotes: 4