Reputation: 1882
I store time-series
data in HBase
. The rowkey is composed from user_id
and timestamp
, like this:
{
"userid1-1428364800" : {
"columnFamily1" : {
"val" : "1"
}
}
}
"userid1-1428364803" : {
"columnFamily1" : {
"val" : "2"
}
}
}
"userid2-1428364812" : {
"columnFamily1" : {
"val" : "abc"
}
}
}
}
Now I need to perform per-user analysis. Here is the initialization of hbase_rdd
(from here)
sc = SparkContext(appName="HBaseInputFormat")
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
The natural mapreduce-like way to process would be:
hbase_rdd
.map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1]))) # shift timestamp from key to value
.groupByKey()
.map(processUserData) # process user's data
While executing first map (shift timestamp from key to value) it is crucial to know when the time-series data of the current user is finished and therefore groupByKey transformation could be started. Thus we do not need to map over all table and store all the temporary data. It is possible because hbase stores row-keys in a sorted order.
With hadoop streaming it could be done in such way:
import sys
current_user_data = []
last_userid = None
for line in sys.stdin:
k, v = line.split('\t')
userid, timestamp = k.split('-')
if userid != last_userid and current_user_data:
print processUserData(last_userid, current_user_data)
last_userid = userid
current_user_data = [(timestamp, v)]
else:
current_user_data.append((timestamp, v))
The question is: how to utilize the sorted order of hbase keys within Spark?
Upvotes: 4
Views: 731
Reputation: 753
I'm not super familiar with the guarantees you get with the way you're pulling data from HBase, but if I understand correctly, I can answer with just plain old Spark.
You've got some RDD[X]
. As far as Spark knows, the X
s in that RDD
are completely unordered. But you have some outside knowledge, and you can guarantee that the data is in fact grouped by some field of X
(and perhaps even sorted by another field).
In that case, you can use mapPartitions
to do virtually the same thing you did with hadoop streaming. That lets you iterate over all the records in one partition, so you can look for blocks of records w/ the same key.
val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
var currentUser: X = null
//itr is an iterator over *all* the records in one partition
itr.flatMap { x =>
if (currentUser != null && x.userId == currentUser.userId) {
// same user as before -- add the data to our list
currentUserData += x
None
} else {
// its a new user -- return all the data for the old user, and make
// another buffer for the new user
val userDataGrouped = currentUserData
currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
currentUserData += x
currentUser = x
Some(userDataGrouped)
}
}
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle. Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory -- we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }
I realize you wanted to use python -- sorry I figure I'm more likely to get the example correct if I write in Scala. and I think the type annotations make the meaning more clear, but that it probably a Scala bias ... :). In any case, hopefully you can understand what is going on and translate it. (Don't worry too much about flatMap
& Some
& None
, probably unimportant if you understand the idea ...)
Upvotes: 2