maddie
maddie

Reputation: 629

Can SparkContext and StreamingContext co-exist in the same program?

I am trying to set up a Sparkstreaming code which reads line from the Kafka server but processes it using rules written in another local file. I am creating streamingContext for the streaming data and sparkContext for other applying all other spark features - like string manipulation, reading local files etc

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ReadLine")
val ssc = new StreamingContext(sparkConf, Seconds(15))
ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val sentence = lines.toString

    val conf = new SparkConf().setAppName("Bi Gram").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val stringRDD = sc.parallelize(Array(sentence))

But this throws the following error

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874)
org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81)

Upvotes: 11

Views: 5828

Answers (2)

imran
imran

Reputation: 111

yes you can do it you have to first start spark session and

then use its context to start any number of streaming context

val spark = SparkSession.builder().appName("someappname").
config("spark.sql.warehouse.dir",warehouseLocation).getOrCreate()

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))

Simple!!!

Upvotes: 1

Rockie Yang
Rockie Yang

Reputation: 4925

One application can only have ONE SparkContext. StreamingContext is created on SparkContext. Just need to create ssc StreamingContext using SparkContext

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))

If using the following constructor.

StreamingContext(conf: SparkConf, batchDuration: Duration)

It internally create another SparkContext

this(StreamingContext.createNewSparkContext(conf), null, batchDuration)

the SparkContext can get from StreamingContext by

ssc.sparkContext

Upvotes: 17

Related Questions