Reputation: 183
I've created a Spark application in Python following the example described in Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher) to stream Kafka messages using Apache Spark, but it's shutting down before I get the chance to send any messages.
This is where the shutdown section begins in the output.
16/11/26 17:11:06 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 1********6, 58045)
16/11/26 17:11:06 INFO VerifiableProperties: Verifying properties
16/11/26 17:11:06 INFO VerifiableProperties: Property group.id is overridden to
16/11/26 17:11:06 INFO VerifiableProperties: Property zookeeper.connect is overridden to
16/11/26 17:11:07 INFO SparkContext: Invoking stop() from shutdown hook
16/11/26 17:11:07 INFO SparkUI: Stopped Spark web UI at http://192.168.1.16:4040
16/11/26 17:11:07 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/11/26 17:11:07 INFO MemoryStore: MemoryStore cleared
16/11/26 17:11:07 INFO BlockManager: BlockManager stopped
16/11/26 17:11:07 INFO BlockManagerMaster: BlockManagerMaster stopped
16/11/26 17:11:07 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/11/26 17:11:07 INFO SparkContext: Successfully stopped SparkContext
16/11/26 17:11:07 INFO ShutdownHookManager: Shutdown hook called
16/11/26 17:11:07 INFO ShutdownHookManager: Deleting directory /private/var/folders/yn/t3pvrk7s231_11ff2lqr4jhr0000gn/T/spark-1876feee-9b71-413e-a505-99c414aafabf/pyspark-1d97c3dd-0889-42ed-b559-d0fd473faa22
16/11/26 17:11:07 INFO ShutdownHookManager: Deleting directory /private/var/folders/yn/t3pvrk7s231_11ff2lqr4jhr0000gn/T/spark-1876feee-9b71-413e-a505-99c414aafabf
Is there a way I should tell it to wait or am I missing something?
Full code:
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "TwitterWordCount")
ssc = StreamingContext(sc, 1)
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["next"], {"metadata.broker.list": "localhost:9092"})
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print("Printing! %s %s %s %s" % o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
And here's the command to run it in case that's helpful.
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 producer.py
Upvotes: 1
Views: 3142
Reputation: 672
For Scala when submitting to yarn with cluster mode I had to use awaitAnyTermination:
query.start()
sparkSession.streams.awaitAnyTermination()
as (kind of) per the docs here Structured Streaming Guide half way through Quick Example.
Upvotes: 0
Reputation: 1044
You will also need to start the streaming context. Take a look at this example. http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
Upvotes: 1