Reputation: 391
First I built an SBT in a folder in the following way
val sparkVersion = "1.6.3"
scalaVersion := "2.10.5"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
)
libraryDependencies +="datastax" % "spark-cassandra-connector" % "1.6.3-s_2.10"
libraryDependencies +="org.apache.spark" %% "spark-sql" % "1.1.0"
Later in the same folder where my "build.sbt" exists I started the spark shell in following way
>/usr/hdp/2.6.0.3-8/spark/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.3-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1
These are the warnings shown while spark shell is started:
WARN AbstractLifeCycle: FAILED [email protected]:4040: java.net.Bind
java.net.BindException: Address already in use
WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@75bf9e67:
java.net.BindException: Address already in use
In Spark shell am importing the following packages
import org.apache.spark.SparkConf; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.kafka.KafkaUtils; import com.datastax.spark.connector._ ; import org.apache.spark.sql.cassandra._ ;
Then in spark shell creating a configuration in the below way:
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver").set("spark.driver.allowMultipleContexts", "true").setMaster("local");
After creating configuration assigning it, created a new spark streaming context in the below way:
val ssc = new StreamingContext(conf, Seconds(10))
During the creation of spark streaming context the few warnings which are shown above raised again along with other warning, as shown below
WARN AbstractLifeCycle: FAILED [email protected]:4040: java.net.Bind
java.net.BindException: Address already in use
.
.
.
WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@75bf9e67:
java.net.BindException: Address already in use
.
.
.
WARN SparkContext: Multiple running SparkContexts detected in the same JVM!
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMulti
pleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)
.
.
.
WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spa
rk jobs will not get resources to process the received data.
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@616f1c2e
Then using created spark streaming context created a kafkaStream in the below way
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", map("spark-topic" -> 5))
Then printing the stream and starting the ssc in below way
kafkaStream.print()
ssc.start
After use of the above command in shell the output is as shown in the below images
Later starts the following mess ! of stream without printing values but information just as shown in below image
Output that's getting printed repeatedly is here as shown below !
17/08/18 10:01:30 INFO JobScheduler: Starting job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms
17/08/18 10:01:30 INFO JobScheduler: Finished job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms
17/08/18 10:01:30 INFO JobScheduler: Total delay: 0.003 s for time 1503050490000 ms (execution: 0.000 s)
17/08/18 10:01:30 INFO BlockRDD: Removing RDD 3 from persistence list
17/08/18 10:01:30 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[3] at createStream at <console>:39 of time 1503050490000 ms
17/08/18 10:01:30 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1503050470000 ms)
17/08/18 10:01:30 INFO InputInfoTracker: remove old batch metadata: 1503050470000 ms
17/08/18 10:01:30 INFO BlockManager: Removing RDD 3
17/08/18 10:01:40 INFO JobScheduler: Added jobs for time 1503050500000 ms
-------------------------------------------
Time: 1503050500000 ms
-------------------------------------------
Upvotes: 2
Views: 5450
Reputation: 597
Kind of late, but this could help somebody else. Spark shell already instantiates SparkContext, which is available as sc. So to create StreamingContext, just pass the existing sc as argument. Hope this helps!
Upvotes: 0
Reputation: 391
I was able to fix it by doing the following things:
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", map("spark-topic" -> 5))
wrong practice! That is the reason why spark couldn't fetch your stream !
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
right practice!
17/08/18 10:01:30 INFO JobScheduler: Starting job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms
17/08/18 10:01:30 INFO JobScheduler: Finished job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms
17/08/18 10:01:30 INFO JobScheduler: Total delay: 0.003 s for time 1503050490000 ms (execution: 0.000 s)
17/08/18 10:01:30 INFO BlockRDD: Removing RDD 3 from persistence list
17/08/18 10:01:30 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[3] at createStream at <console>:39 of time 1503050490000 ms
17/08/18 10:01:30 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1503050470000 ms)
17/08/18 10:01:30 INFO InputInfoTracker: remove old batch metadata: 1503050470000 ms
17/08/18 10:01:30 INFO BlockManager: Removing RDD 3
17/08/18 10:01:40 INFO JobScheduler: Added jobs for time 1503050500000 ms
-------------------------------------------
Time: 1503050500000 ms
-------------------------------------------
Upvotes: 0
Reputation: 877
WARN AbstractLifeCycle: FAILED [email protected]:4040: java.net.Bind
java.net.BindException: Address already in use
It means that port which needs is already in use. As a rule, port 4040 is used by Spark-thrifteserver. So try to stop thriftserver using stop-thriftserver.sh from spark/sbin folder. Or check who else use this port and free it.
Upvotes: 1