Ravi Ranjan
Ravi Ranjan

Reputation: 353

Putting Multiple column names from a HBase Table into one SparkRDD

I have to put multiple column families from a table in HBase into one sparkRDD. I am attempting this using the following code: (question edited after first aanswer)

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable    
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client._
object HBaseRead {
   def main(args: Array[String]) {
     val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     val sc = new SparkContext(sparkConf)        
     val conf = HBaseConfiguration.create()  
     val tableName = "TableName"  

     ////setting up required stuff 
     System.setProperty("user.name", "hdfs")        
     System.setProperty("HADOOP_USER_NAME", "hdfs")
     conf.set("hbase.master", "localhost:60000")
     conf.setInt("timeout", 120000)
     conf.set("hbase.zookeeper.quorum", "localhost")
     conf.set("zookeeper.znode.parent", "/hbase-unsecure")
     conf.set(TableInputFormat.INPUT_TABLE, tableName)
     sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
     val admin = new HBaseAdmin(conf)
     if (!admin.isTableAvailable(tableName)) {
          val tableDesc = new HTableDescriptor(tableName)
          admin.createTable(tableDesc)
     }
     case class Model(Shoes: String,Clothes: String,T-shirts: String)
     var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
     val transformedRDD = hBaseRDD2.map(tuple => {
         val result = tuple._2
         Model(Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Shoes"))),
         Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Clothes"))),
         Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("T-shirts")))
         )
     })
     val totalcount = transformedRDD.count()
     println(totalcount)
   }
}

What I want to do is to make a single rdd wherein values of first row (and subsequent rows later on) from these column families would be combined in a single array in the rdd. Any help would be appreciated. Thanks

Upvotes: 2

Views: 1018

Answers (1)

Shankar
Shankar

Reputation: 8957

You can do it couple of ways, inside rdd map you can get all the columns from the parent rdd[hBaseRDD2] and transform it and return it as another single rdd.

or you can create a case class and map it to that columns.

For example:

case class Model(column1: String,
                      column1: String,
                      column1: String)

var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val transformedRDD = hBaseRDD2.map(tuple => {
    val result = tuple._2
    Model(Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("Columnname1"))),
    Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2"))),
    Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2")))
    )
})

Upvotes: 1

Related Questions