Reputation: 353
I have to put multiple column families from a table in HBase into one sparkRDD. I am attempting this using the following code: (question edited after first aanswer)
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "TableName"
////setting up required stuff
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
case class Model(Shoes: String,Clothes: String,T-shirts: String)
var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val transformedRDD = hBaseRDD2.map(tuple => {
val result = tuple._2
Model(Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Shoes"))),
Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Clothes"))),
Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("T-shirts")))
)
})
val totalcount = transformedRDD.count()
println(totalcount)
}
}
What I want to do is to make a single rdd wherein values of first row (and subsequent rows later on) from these column families would be combined in a single array in the rdd. Any help would be appreciated. Thanks
Upvotes: 2
Views: 1018
Reputation: 8957
You can do it couple of ways, inside rdd
map you can get all the columns from the parent rdd[hBaseRDD2
] and transform it and return it as another single rdd.
or you can create a case class and map it to that columns.
For example:
case class Model(column1: String,
column1: String,
column1: String)
var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val transformedRDD = hBaseRDD2.map(tuple => {
val result = tuple._2
Model(Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("Columnname1"))),
Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2"))),
Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2")))
)
})
Upvotes: 1