doubts
doubts

Reputation: 1882

Does spark utilize the sorted order of hbase keys, when using hbase as data source

I store time-series data in HBase. The rowkey is composed from user_id and timestamp, like this:

{
    "userid1-1428364800" : {
        "columnFamily1" : {
            "val" : "1"
            }
        }
    }
    "userid1-1428364803" : {
        "columnFamily1" : {
            "val" : "2"
            }
        }
    }

    "userid2-1428364812" : {
        "columnFamily1" : {
            "val" : "abc"
            }
        }
    }

}

Now I need to perform per-user analysis. Here is the initialization of hbase_rdd (from here)

sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=conf)

The natural mapreduce-like way to process would be:

hbase_rdd
   .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1])))  # shift timestamp from key to value
   .groupByKey()
   .map(processUserData)  # process user's data

While executing first map (shift timestamp from key to value) it is crucial to know when the time-series data of the current user is finished and therefore groupByKey transformation could be started. Thus we do not need to map over all table and store all the temporary data. It is possible because hbase stores row-keys in a sorted order.

With hadoop streaming it could be done in such way:

import sys

current_user_data = []
last_userid = None
for line in sys.stdin:
    k, v = line.split('\t')
    userid, timestamp = k.split('-')
    if userid != last_userid and current_user_data:
        print processUserData(last_userid, current_user_data)
        last_userid = userid
        current_user_data = [(timestamp, v)]
    else:
        current_user_data.append((timestamp, v))

The question is: how to utilize the sorted order of hbase keys within Spark?

Upvotes: 4

Views: 731

Answers (1)

Imran Rashid
Imran Rashid

Reputation: 753

I'm not super familiar with the guarantees you get with the way you're pulling data from HBase, but if I understand correctly, I can answer with just plain old Spark.

You've got some RDD[X]. As far as Spark knows, the Xs in that RDD are completely unordered. But you have some outside knowledge, and you can guarantee that the data is in fact grouped by some field of X (and perhaps even sorted by another field).

In that case, you can use mapPartitions to do virtually the same thing you did with hadoop streaming. That lets you iterate over all the records in one partition, so you can look for blocks of records w/ the same key.

val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
  var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
  var currentUser: X = null
  //itr is an iterator over *all* the records in one partition
  itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) {
      // same user as before -- add the data to our list
      currentUserData += x
      None
    } else {
      // its a new user -- return all the data for the old user, and make
      // another buffer for the new user
      val userDataGrouped = currentUserData
      currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
      currentUserData += x
      currentUser = x
      Some(userDataGrouped)
    }
  }
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle.  Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory -- we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }

I realize you wanted to use python -- sorry I figure I'm more likely to get the example correct if I write in Scala. and I think the type annotations make the meaning more clear, but that it probably a Scala bias ... :). In any case, hopefully you can understand what is going on and translate it. (Don't worry too much about flatMap & Some & None, probably unimportant if you understand the idea ...)

Upvotes: 2

Related Questions