Autumn
Autumn

Reputation: 129

How to write and update by kudu API in Spark 2.1

I want to write and update by Kudu API. This is the maven dependency:

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-spark2_2.11</artifactId>
  <version>1.1.0</version>
</dependency>

In the following code, I have no idea about KuduContext parameter.

My code in spark2-shell:

val kuduContext = new KuduContext("master:7051") 

Also the same error in Spark 2.1 streaming:

import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
val sparkConf = new SparkConf().setAppName("DirectKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
   val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
   import spark.implicits._
   val bb = spark.read.options(Map("kudu.master" -> "master:7051","kudu.table" -> "table")).kudu //good 
   val kuduContext = new KuduContext("master:7051") //error
})

Then the error:

org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)

Upvotes: 1

Views: 1818

Answers (1)

Shaido
Shaido

Reputation: 28392

Update your version of Kudu to the latest one (currently 1.5.0). The KuduContext takes the SparkContext as an input parameter in later versions and that should prevent this problem.

Also, do the initial Spark initialization outside of the foreachRDD. In the code you provided, move both the spark and kuduContext out of the foreach. Also, you do not need to create a separate sparkConf, you can use the newer SparkSession only.

val spark = SparkSession.builder.appName("DirectKafka").master("local[*]").getOrCreate()
import spark.implicits._

val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val bb = spark.read.options(Map("kudu.master" -> "master:7051", "kudu.table" -> "table")).kudu

val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {   
  // do something with the bb table and messages       
})

Upvotes: 1

Related Questions