Reputation: 1655
I have few questions on spark streaming with Kafka and HBase. Below is my program for spark streaming,here i am using zookeeper configuartions to connect to Kafka and Hbase. Do we really need this configuration in the streaming code? Or i am doing it wrong If am using hadoop distribution such as Hortonworks or Cloudera, there should be provision to configure spark with kafka and Hbase, so that my spark stream code should only take parameter such as kafka topic and Hbase table no zoo keeper and other configurations. If this can be done can you please help me with the steps.
object KafkaSparkStream{
def main(args: Array[String]): Unit =
{
var arg = Array("10.74.163.163:9092,10.74.163.154:9092", "10.74.163.154:2181", "test_topic")
val Array(broker, zk, topic) = arg
val conf = new SparkConf()
.setAppName("KafkaSparkStreamToHbase")
.setMaster("local[2]");
//.setMaster("yarn-client")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaConf = Map("metadata.broker.list" -> broker,
"zookeeper.connect" -> zk,
"group.id" -> "kafka-spark-streaming-example",
"zookeeper.connection.timeout.ms" -> "1000")
/* Kafka integration with reciever */
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1),
StorageLevel.MEMORY_ONLY_SER).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => {
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
conf.set("hbase.zookeeper.quorum", "10.74.163.154:2181")
conf.set("hbase.master", "HOSTNAME:16000");
conf.set("hbase.rootdir", "file:///tmp/hbase")
val jobConf = new Configuration(conf)
jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)
//rdd.saveAsNewAPIHadoopDataset(jobConf)
rdd.map(convert).saveAsNewAPIHadoopDataset(jobConf)
})
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
Upvotes: 2
Views: 816
Reputation: 4927
The way to go with HBase is to add your hbase-site.xml configuration file to Spark classpath.
For kafka you can use https://github.com/typesafehub/config to load properties from custom configuration files. In order to work with this config files you have to:
--driver-class-path <dir with the config file>
--files <configuration file>
to copy this file to each executor's working dir spark.executor.extraClassPath=./
to add each executor's working dir to its classpathUpvotes: 1