Zhangxr
Zhangxr

Reputation: 13

py4j.protocol.Py4JavaError: An error occured while calling o22.start

I am now trying to put SparkStreaming and Kafka work together on Ubantu. But here comes the question.

I can make sure Kafka's working properly.

On the first terminal:

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties

On the second terminal:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest

then,I create some data:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
hello hadoop
hello spark

On the third terminal:

cd /usr/local/spark/mycode/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py localhost:2181 wordsendertest

Code of kafkaWordCount.py:

from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys

if __name__ == "__main__":
   if len(sys.argv) != 3:
      print("usage:KafkaWordCount.py<zk><topic>",file=sys.stderr)
      exit(-1)
   sc = SparkContext(appName="PythonStreamingKafkaWordCount")
   ssc = StreamingContext(sc,1)
   zkQuorum,topic = sys.argv[1:]
   kvs = KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic:1})
   lines = kvs.map(lambda x:x[1])
   counts = lines.flatMap(lambda x:x.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
   counts.pprint
   ssc.start()
   ssc.awaitTermination()

My error:

Traceback (most recent call last):
  File "/usr/local/spark/mycode/kafka/./KafkaWordCount.py", line 20, in <module>
    ssc.start()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 196, in start
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o22.start.
: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

Please help me! Thank you!

Upvotes: 0

Views: 445

Answers (1)

s.polam
s.polam

Reputation: 10382

You forgot to add () in counts.pprint function.

Change counts.pprint to counts.pprint(), It will work.

Upvotes: 1

Related Questions