AshrithGande
AshrithGande

Reputation: 391

How to fetch Kafka Stream and print it in Spark Shell?

First I built an SBT in a folder in the following way

val sparkVersion = "1.6.3"                                                                                                                           
scalaVersion := "2.10.5"                                                                                                                             
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"                                                                  
libraryDependencies ++= Seq(                                                                                                                         
"org.apache.spark" %% "spark-streaming" % sparkVersion,                                                                                              
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion                                                                                         
 )                                                                                                                                                   
libraryDependencies +="datastax" % "spark-cassandra-connector" % "1.6.3-s_2.10"                                                                      
libraryDependencies +="org.apache.spark" %% "spark-sql" % "1.1.0"

Later in the same folder where my "build.sbt" exists I started the spark shell in following way

 >/usr/hdp/2.6.0.3-8/spark/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.3-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1

These are the warnings shown while spark shell is started:


WARN AbstractLifeCycle: FAILED [email protected]:4040: java.net.Bind
java.net.BindException: Address already in use
WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@75bf9e67:
java.net.BindException: Address already in use

In Spark shell am importing the following packages

import org.apache.spark.SparkConf; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.kafka.KafkaUtils; import com.datastax.spark.connector._ ; import org.apache.spark.sql.cassandra._ ;

Then in spark shell creating a configuration in the below way:

val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver").set("spark.driver.allowMultipleContexts", "true").setMaster("local");

After creating configuration assigning it, created a new spark streaming context in the below way:

val ssc = new StreamingContext(conf, Seconds(10))

During the creation of spark streaming context the few warnings which are shown above raised again along with other warning, as shown below


WARN AbstractLifeCycle: FAILED [email protected]:4040: java.net.Bind
java.net.BindException: Address already in use
.
.
.
WARN AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@75bf9e67:
java.net.BindException: Address already in use
.
.
.
WARN SparkContext: Multiple running SparkContexts detected in the same JVM!                                                        
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMulti
pleContexts = true. The currently running SparkContext was created at:                                                                               
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)                                                                                          
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)
.
.
.
WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spa
rk jobs will not get resources to process the received data.                                                                                         
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@616f1c2e

Then using created spark streaming context created a kafkaStream in the below way

val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", map("spark-topic" -> 5))

Then printing the stream and starting the ssc in below way

kafkaStream.print()
ssc.start

After use of the above command in shell the output is as shown in the below images

Image1- Eariler part of output

Image2 - after fetching Jars & connecting to zookeeper

Issues came across in zookeeper

Later starts the following mess ! of stream without printing values but information just as shown in below image Final output

Output that's getting printed repeatedly is here as shown below !


17/08/18 10:01:30 INFO JobScheduler: Starting job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms                             
17/08/18 10:01:30 INFO JobScheduler: Finished job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms                             
17/08/18 10:01:30 INFO JobScheduler: Total delay: 0.003 s for time 1503050490000 ms (execution: 0.000 s)                                             
17/08/18 10:01:30 INFO BlockRDD: Removing RDD 3 from persistence list                                                                                
17/08/18 10:01:30 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[3] at createStream at <console>:39 of time 1503050490000 ms                
17/08/18 10:01:30 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1503050470000 ms)                                                          
17/08/18 10:01:30 INFO InputInfoTracker: remove old batch metadata: 1503050470000 ms                                                                 
17/08/18 10:01:30 INFO BlockManager: Removing RDD 3                                                                                                  
17/08/18 10:01:40 INFO JobScheduler: Added jobs for time 1503050500000 ms                                                                            
-------------------------------------------                                                                                                          
Time: 1503050500000 ms                                                                                                                               
-------------------------------------------

Upvotes: 2

Views: 5450

Answers (3)

Jorge Cespedes
Jorge Cespedes

Reputation: 597

Kind of late, but this could help somebody else. Spark shell already instantiates SparkContext, which is available as sc. So to create StreamingContext, just pass the existing sc as argument. Hope this helps!

Upvotes: 0

AshrithGande
AshrithGande

Reputation: 391

I was able to fix it by doing the following things:

  1. Take care of case sensitivity, because Scala is case sensitive language. In below part of code used map() instead of Map()

val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", map("spark-topic" -> 5)) wrong practice! That is the reason why spark couldn't fetch your stream !

val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5)) right practice!

  1. Check if producer is streaming to Kafka topics mentioned in the map function! Spark couldn't fetch your stream from topic mentioned, either when Kafka-producer streaming data to that topic is stopped or when the data in streaming is finished, spark starts removing RDD from array buffer and displays the message as below!

17/08/18 10:01:30 INFO JobScheduler: Starting job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms                             
17/08/18 10:01:30 INFO JobScheduler: Finished job streaming job 1503050490000 ms.0 from job set of time 1503050490000 ms                             
17/08/18 10:01:30 INFO JobScheduler: Total delay: 0.003 s for time 1503050490000 ms (execution: 0.000 s)                                             
17/08/18 10:01:30 INFO BlockRDD: Removing RDD 3 from persistence list                                                                                
17/08/18 10:01:30 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[3] at createStream at <console>:39 of time 1503050490000 ms                
17/08/18 10:01:30 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1503050470000 ms)                                                          
17/08/18 10:01:30 INFO InputInfoTracker: remove old batch metadata: 1503050470000 ms                                                                 
17/08/18 10:01:30 INFO BlockManager: Removing RDD 3                                                                                                  
17/08/18 10:01:40 INFO JobScheduler: Added jobs for time 1503050500000 ms                                                                            
-------------------------------------------                                                                                                          
Time: 1503050500000 ms                                                                                                                               
-------------------------------------------

  1. Follow the comments and response of @YehorKrivokon and @VinodChandak to avoid the warnings faced!

Upvotes: 0

Yehor Krivokon
Yehor Krivokon

Reputation: 877

WARN AbstractLifeCycle: FAILED [email protected]:4040: java.net.Bind   
java.net.BindException: Address already in use

It means that port which needs is already in use. As a rule, port 4040 is used by Spark-thrifteserver. So try to stop thriftserver using stop-thriftserver.sh from spark/sbin folder. Or check who else use this port and free it.

Upvotes: 1

Related Questions