Atul Verma
Atul Verma

Reputation: 51

How to do parallel reading from Hbase using list of RowKeys in Spark

I Have 6.5 million rowkeys and want to retrieve data from hbase in spark-job. How do i retrieve result from hbase parallely?

i don't think this snippet will run on executors.

List<Get> listOFGets = new ArrayList<Get>();
Result[] results = Htable.get(listOFGets);

Upvotes: 1

Views: 1613

Answers (2)

Travis Hegner
Travis Hegner

Reputation: 2495

I generally use an hbase scan, with the .newAPIHadoopRDD() method. Beware this is a pretty ugly mix of scala with java apis. You can pass in an arbitrary list of row keys (an empty list returns all records in the table). If your row keys are not Long encoded, then you may have to modify the code a bit.

def hbaseScan(ids: List[Long]): Dataset[Result] = {
  val ranges = ListBuffer[MultiRowRangeFilter.RowRange]()
  //converts each id (Long) into a one element RowRange
  //(id gets implicitly get converted to byte[])
  ids.foreach(i => {
    ranges += new MultiRowRangeFilter.RowRange(i, true, i + 1, false)
  })

  val scan = new Scan()
  scan.setCaching(1000) /* fetch 1000 records in each trip to hbase */
  scan.setCacheBlocks(false) /* don't waste hbase cache space, since we are scanning whole table

  if (ranges.nonEmpty) {
    //The list of RowRanges is sorted and merged into a single scan filter
    scan.setFilter(new MultiRowRangeFilter(MultiRowRangeFilter.sortAndMerge(ranges.asJava)))
  }

  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE /*set your table name here*/)
  conf.set(TableInputFormat.SCAN, scan)

  spark.sparkContext.newAPIHadoopRDD(
    conf,
    classOf[TableInputFormat],
    classOf[ImmutableBytesWritable],
    classOf[Result]
  ).toDF("result").as[Result]
}

This will return an Dataset[Result] with the same number of partitions as there are regions in the scanned table. Apologies, I don't have any equivalent java code to share.

Edit: addressing not right way comment

I should have prefaced that this method works best when either reading the entire hbase table, or a small number of arbitrary row keys. My use case is doing exactly both of those, as I always query 1000 row-keys at a time, or the whole table, nothing in-between.

If your number of arbitrary row-keys is large, there will be a single core hang up on the MultiRowRangeFilter.sortAndMerge() step. This method could be expanded to parallelize the process of sorting and merging the list of keys into key ranges before creating the Filter for the scan. After the sort and merge, this method is indeed parallel across as many partitions as you have regions, and even reduces the number of round trips to hbase if you have many contiguous row-key ranges.

It's hard to say if that process would be efficient for you than spreading random gets across the cluster, as it wholly depends on many factors: record size, table size, row-key ranges, etc. I am confident that for many use cases this method would be more efficient, but obviously not for every use case.

Upvotes: 1

VS_FF
VS_FF

Reputation: 2373

You can run parallel scans on executors by creating an RDD (in whichever way suits you) and then calling the method .foreachPartition of the JavaHBaseContext object. This way, the HBaseContext will pass the Connection instance details to each instance of your function class and inside that function you could do the scan by getting the table, etc.

It's up to you how to create the RDD to accommodate this, and how many elements it should have (just make sure it has as many partitions as you want to do of parallel scans). In my experience, as many concurrent tasks you can run on spark, that many parallel scans you can do (depending on your HBase cluster strength, of course).

Java code could look something like this:

On master:

JavaHBaseContext hBaseContext = new JavaHBaseContext(sparkContext, HBaseConfig);
JavaRDD<blah> myRDD = ... (create an RDD with a number of elements)
hBaseContext.foreachPartition(myRDD,  new MyParFunction());

Your function class will then look something like this:

class MyParFunction implements VoidFunction<Tuple2<Iterator<blah>, Connection>>
{
@Override
    public void call(Tuple2<Iterator<blah>, Connection> t) throws Exception
    {
// Do your scan here, since you have the Connection object t
}
}

This should run the scans in parallel on all executors

Upvotes: 0

Related Questions