Owais Ajaz
Owais Ajaz

Reputation: 264

Spark streaming from Kafka returns result on local but Not working on Yarn

I am using Cloudera's VM CDH 5.12, spark v1.6, kafka(installed by yum) v0.10 and python 2.66 and scala 2.10

Below is a simple spark application that I am running. It takes events from kafka and prints it after map reduce.

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

When I submit above code using following command(local) it runs fine

spark-submit --master local[2] --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>

But when I submit same above code using following command(YARN) it doesn't work

spark-submit --master yarn --deploy-mode client --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>

Here is the log generated when ran on YARN(cutting them short, logs may differ from above mentioned spark settings):

INFO Client: 
client token: N/A
diagnostics: N/A
ApplicationMaster host: 192.168.134.143
ApplicationMaster RPC port: 0
queue: root.cloudera
start time: 1515766709025
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1515761416282_0010/
user: cloudera

40 INFO YarnClientSchedulerBackend: Application application_1515761416282_0010 has started running.
40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53694.
40 INFO NettyBlockTransferService: Server created on 53694
53 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
54 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56220 with 534.5 MB RAM, BlockManagerId(1, quickstart.cloudera, 56220)
07 INFO ReceiverTracker: Starting 1 receivers
07 INFO ReceiverTracker: ReceiverTracker started
07 INFO PythonTransformedDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: Slide time = 10000 ms
07 INFO KafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO KafkaInputDStream: Checkpoint interval = null
07 INFO KafkaInputDStream: Remember duration = 10000 ms
07 INFO KafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.KafkaInputDStream@7137ea0e
07 INFO PythonTransformedDStream: Slide time = 10000 ms
07 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO PythonTransformedDStream: Checkpoint interval = null
07 INFO PythonTransformedDStream: Remember duration = 10000 ms
07 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@de77734

10 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.8 KB, free 534.5 MB)
10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.5 KB, free 534.5 MB)
20 INFO JobScheduler: Added jobs for time 1515766760000 ms
30 INFO JobScheduler: Added jobs for time 1515766770000 ms
40 INFO JobScheduler: Added jobs for time 1515766780000 ms

After this, the job just starts repeating following lines(after some delay set by stream context) and doesnt printout kafka's stream, whereas job on master local with the exact same code does.

Interestingly it prints following line every-time a kafka event occurs(picture is of increased spark memory settings)

Note that:

Data is in kafka and I can see that in consumer console I have also tried increasing executor's momory(3g) and network timeout time(800s) but no success

Upvotes: 3

Views: 1040

Answers (3)

Artem Trunov
Artem Trunov

Reputation: 1415

It's possible, that your is an alias and it's not defined on yarn nodes, or is not resolved on the yarn nodes for other reasons.

Upvotes: 0

Kiran Balakrishnan
Kiran Balakrishnan

Reputation: 257

When in local mode the application runs in a single machine and you get to see all the prints given in the codes.When run on a cluster everything is in distributed mode and runs on different machines/cores an will not be able to see the print given Try to get the logs generated by spark using command yarn logs -applicationId

Upvotes: 1

ozlemg
ozlemg

Reputation: 436

Can you see application stdout logs through Yarn Resource Manager UI?

  1. Follow your Yarn Resource Manager link.(http://localhost:8088).
  2. Find your application in running applications list and follow application's link. (http://localhost:8088/application_1396885203337_0003/)
  3. Open "stdout : Total file length is xxxx bytes" link to see log file on browser.

Hope this helps.

Upvotes: 1

Related Questions