nilesh1212
nilesh1212

Reputation: 1655

Spark Stream Kafka and Hbase Config

I have few questions on spark streaming with Kafka and HBase. Below is my program for spark streaming,here i am using zookeeper configuartions to connect to Kafka and Hbase. Do we really need this configuration in the streaming code? Or i am doing it wrong If am using hadoop distribution such as Hortonworks or Cloudera, there should be provision to configure spark with kafka and Hbase, so that my spark stream code should only take parameter such as kafka topic and Hbase table no zoo keeper and other configurations. If this can be done can you please help me with the steps.

object KafkaSparkStream{
  def main(args: Array[String]): Unit =
    {
      var arg = Array("10.74.163.163:9092,10.74.163.154:9092", "10.74.163.154:2181", "test_topic")
      val Array(broker, zk, topic) = arg
      val conf = new SparkConf()
        .setAppName("KafkaSparkStreamToHbase")
        .setMaster("local[2]");
      //.setMaster("yarn-client")
      val ssc = new StreamingContext(conf, Seconds(5))
      val kafkaConf = Map("metadata.broker.list" -> broker,
        "zookeeper.connect" -> zk,
        "group.id" -> "kafka-spark-streaming-example",
        "zookeeper.connection.timeout.ms" -> "1000")
      /* Kafka integration with reciever */
      val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
        ssc, kafkaConf, Map(topic -> 1),
        StorageLevel.MEMORY_ONLY_SER).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.foreachRDD(rdd => {
        val conf = HBaseConfiguration.create()
        conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
        conf.set("hbase.zookeeper.quorum", "10.74.163.154:2181")
        conf.set("hbase.master", "HOSTNAME:16000");
        conf.set("hbase.rootdir", "file:///tmp/hbase")
        val jobConf = new Configuration(conf)
        jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
        jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
        jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)
        //rdd.saveAsNewAPIHadoopDataset(jobConf)
        rdd.map(convert).saveAsNewAPIHadoopDataset(jobConf)
      })
      wordCounts.print()
      ssc.start()
      ssc.awaitTermination()
    }

Upvotes: 2

Views: 816

Answers (1)

Marco
Marco

Reputation: 4927

The way to go with HBase is to add your hbase-site.xml configuration file to Spark classpath.

For kafka you can use https://github.com/typesafehub/config to load properties from custom configuration files. In order to work with this config files you have to:

  • set --driver-class-path <dir with the config file>
  • set --files <configuration file> to copy this file to each executor's working dir
  • set spark.executor.extraClassPath=./ to add each executor's working dir to its classpath

Upvotes: 1

Related Questions