Reputation: 264
I am using Cloudera's VM CDH 5.12, spark v1.6, kafka(installed by yum) v0.10 and python 2.66 and scala 2.10
Below is a simple spark application that I am running. It takes events from kafka and prints it after map reduce.
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
When I submit above code using following command(local) it runs fine
spark-submit --master local[2] --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>
But when I submit same above code using following command(YARN) it doesn't work
spark-submit --master yarn --deploy-mode client --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>
Here is the log generated when ran on YARN(cutting them short, logs may differ from above mentioned spark settings):
INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 192.168.134.143
ApplicationMaster RPC port: 0
queue: root.cloudera
start time: 1515766709025
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1515761416282_0010/
user: cloudera
40 INFO YarnClientSchedulerBackend: Application application_1515761416282_0010 has started running.
40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53694.
40 INFO NettyBlockTransferService: Server created on 53694
53 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
54 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56220 with 534.5 MB RAM, BlockManagerId(1, quickstart.cloudera, 56220)
07 INFO ReceiverTracker: Starting 1 receivers
07 INFO ReceiverTracker: ReceiverTracker started
07 INFO PythonTransformedDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: Slide time = 10000 ms
07 INFO KafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO KafkaInputDStream: Checkpoint interval = null
07 INFO KafkaInputDStream: Remember duration = 10000 ms
07 INFO KafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.KafkaInputDStream@7137ea0e
07 INFO PythonTransformedDStream: Slide time = 10000 ms
07 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO PythonTransformedDStream: Checkpoint interval = null
07 INFO PythonTransformedDStream: Remember duration = 10000 ms
07 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@de77734
10 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.8 KB, free 534.5 MB)
10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.5 KB, free 534.5 MB)
20 INFO JobScheduler: Added jobs for time 1515766760000 ms
30 INFO JobScheduler: Added jobs for time 1515766770000 ms
40 INFO JobScheduler: Added jobs for time 1515766780000 ms
After this, the job just starts repeating following lines(after some delay set by stream context) and doesnt printout kafka's stream, whereas job on master local with the exact same code does.
Interestingly it prints following line every-time a kafka event occurs(picture is of increased spark memory settings)
Note that:
Data is in kafka and I can see that in consumer console I have also tried increasing executor's momory(3g) and network timeout time(800s) but no success
Upvotes: 3
Views: 1040
Reputation: 1415
It's possible, that your is an alias and it's not defined on yarn nodes, or is not resolved on the yarn nodes for other reasons.
Upvotes: 0
Reputation: 257
When in local mode the application runs in a single machine and you get to see all the prints given in the codes.When run on a cluster everything is in distributed mode and runs on different machines/cores an will not be able to see the print given Try to get the logs generated by spark using command yarn logs -applicationId
Upvotes: 1
Reputation: 436
Can you see application stdout logs through Yarn Resource Manager UI?
Hope this helps.
Upvotes: 1