Reputation: 13
I am now trying to put SparkStreaming and Kafka work together on Ubantu. But here comes the question.
I can make sure Kafka's working properly.
On the first terminal:
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties
On the second terminal:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest
then,I create some data:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
hello hadoop
hello spark
On the third terminal:
cd /usr/local/spark/mycode/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py localhost:2181 wordsendertest
Code of kafkaWordCount.py:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("usage:KafkaWordCount.py<zk><topic>",file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc,1)
zkQuorum,topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic:1})
lines = kvs.map(lambda x:x[1])
counts = lines.flatMap(lambda x:x.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
counts.pprint
ssc.start()
ssc.awaitTermination()
My error:
Traceback (most recent call last):
File "/usr/local/spark/mycode/kafka/./KafkaWordCount.py", line 20, in <module>
ssc.start()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 196, in start
File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o22.start.
: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
Please help me! Thank you!
Upvotes: 0
Views: 445
Reputation: 10382
You forgot to add ()
in counts.pprint
function.
Change counts.pprint
to counts.pprint()
, It will work.
Upvotes: 1