slayton
slayton

Reputation: 20319

How can I read from one HBase instance but write to another?

Currently I have two Hbase tables (lets call them tableA and tableB). Using a single stage MapReduce job the data in tableA is read processed and saved to tableB. Currently both tables reside on the same HBase cluster. However, I need to relocate tableB to its on cluster.

Is it possible to configure a single stage map reduce job in Hadoop to read and write from separate instances of HBase?

Upvotes: 9

Views: 615

Answers (1)

Rubén Moraleda
Rubén Moraleda

Reputation: 3067

It is possible, HBase's CopyTable MapReduce job does it by using TableMapReduceUtil.initTableReducerJob() which allows you to set an alternative quorumAddress in case you need to write to remote clusters:

public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, org.apache.hadoop.mapreduce.Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)

quorumAddress - Distant cluster to write to; default is null for output to the cluster that is designated in hbase-site.xml. Set this String to the zookeeper ensemble of an alternate remote cluster when you would have the reduce write a cluster that is other than the default; e.g. copying tables between clusters, the source would be designated by hbase-site.xml and this param would have the ensemble address of the remote cluster. The format to pass is particular. Pass :: such as server,server2,server3:2181:/hbase.


Another option is to implement your own custom reducer to write to the remote table instead of writing to the context. Something similar to this:

public static class MyReducer extends Reducer<Text, Result, Text, Text> {

    protected Table remoteTable; 
    protected Connection connection;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        // Clone configuration and provide a new quorum address for the remote cluster
        Configuration config = HBaseConfiguration.create(context.getConfiguration());
        config.set("hbase.zookeeper.quorum","quorum1,quorum2,quorum3");
        connection = ConnectionFactory.createConnection(config); // HBase 0.99+
        //connection = HConnectionManager.createConnection(config); // HBase <0.99
        remoteTable = connection.getTable("myTable".getBytes());
        remoteTable.setAutoFlush(false);
        remoteTable.setWriteBufferSize(1024L*1024L*10L); // 10MB buffer
    }

    public void reduce(Text boardKey, Iterable<Result> results, Context context) throws IOException, InterruptedException {
        /* Write puts to remoteTable */
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        if (remoteTable!=null) {
            remoteTable.flushCommits();
            remoteTable.close();
        }
        if(connection!=null) {
            connection.close();
        }
    }
}

Upvotes: 4

Related Questions