Reputation: 51
I Have 6.5 million rowkeys and want to retrieve data from hbase in spark-job. How do i retrieve result from hbase parallely?
i don't think this snippet will run on executors.
List<Get> listOFGets = new ArrayList<Get>();
Result[] results = Htable.get(listOFGets);
Upvotes: 1
Views: 1613
Reputation: 2495
I generally use an hbase scan, with the .newAPIHadoopRDD()
method. Beware this is a pretty ugly mix of scala with java apis. You can pass in an arbitrary list of row keys (an empty list returns all records in the table). If your row keys are not Long encoded, then you may have to modify the code a bit.
def hbaseScan(ids: List[Long]): Dataset[Result] = {
val ranges = ListBuffer[MultiRowRangeFilter.RowRange]()
//converts each id (Long) into a one element RowRange
//(id gets implicitly get converted to byte[])
ids.foreach(i => {
ranges += new MultiRowRangeFilter.RowRange(i, true, i + 1, false)
})
val scan = new Scan()
scan.setCaching(1000) /* fetch 1000 records in each trip to hbase */
scan.setCacheBlocks(false) /* don't waste hbase cache space, since we are scanning whole table
if (ranges.nonEmpty) {
//The list of RowRanges is sorted and merged into a single scan filter
scan.setFilter(new MultiRowRangeFilter(MultiRowRangeFilter.sortAndMerge(ranges.asJava)))
}
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE /*set your table name here*/)
conf.set(TableInputFormat.SCAN, scan)
spark.sparkContext.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
).toDF("result").as[Result]
}
This will return an Dataset[Result]
with the same number of partitions as there are regions in the scanned table. Apologies, I don't have any equivalent java code to share.
Edit: addressing not right way comment
I should have prefaced that this method works best when either reading the entire hbase table, or a small number of arbitrary row keys. My use case is doing exactly both of those, as I always query 1000 row-keys at a time, or the whole table, nothing in-between.
If your number of arbitrary row-keys is large, there will be a single core hang up on the MultiRowRangeFilter.sortAndMerge()
step. This method could be expanded to parallelize the process of sorting and merging the list of keys into key ranges before creating the Filter
for the scan. After the sort and merge, this method is indeed parallel across as many partitions as you have regions, and even reduces the number of round trips to hbase if you have many contiguous row-key ranges.
It's hard to say if that process would be efficient for you than spreading random gets across the cluster, as it wholly depends on many factors: record size, table size, row-key ranges, etc. I am confident that for many use cases this method would be more efficient, but obviously not for every use case.
Upvotes: 1
Reputation: 2373
You can run parallel scans on executors by creating an RDD
(in whichever way suits you) and then calling the method .foreachPartition
of the JavaHBaseContext
object. This way, the HBaseContext
will pass the Connection
instance details to each instance of your function class and inside that function you could do the scan by getting the table, etc.
It's up to you how to create the RDD to accommodate this, and how many elements it should have (just make sure it has as many partitions as you want to do of parallel scans). In my experience, as many concurrent tasks you can run on spark, that many parallel scans you can do (depending on your HBase cluster strength, of course).
Java code could look something like this:
On master:
JavaHBaseContext hBaseContext = new JavaHBaseContext(sparkContext, HBaseConfig);
JavaRDD<blah> myRDD = ... (create an RDD with a number of elements)
hBaseContext.foreachPartition(myRDD, new MyParFunction());
Your function class will then look something like this:
class MyParFunction implements VoidFunction<Tuple2<Iterator<blah>, Connection>>
{
@Override
public void call(Tuple2<Iterator<blah>, Connection> t) throws Exception
{
// Do your scan here, since you have the Connection object t
}
}
This should run the scans in parallel on all executors
Upvotes: 0