Reputation: 31
I am trying to generate some random data through kafka producer using java code in eclipse IDE. I receive the same data in kafka consumer which also created using java code in the same IDE. My work depends upon streaming data. So, I need spark streaming to receive the random data generated by kafka. For spark streaming, I am using python code in jupyter-notebook. To integrate kafka with spark, "spark-streaming-kafka-0-10_2.12-3.0.0.jar" file has to be added to the spark jar. I also tried to add the jar file in pyspark. Here is my spark code
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 3
topic = "generate"
spark = SparkSession.builder.master("local[*]") \
.appName("kafkaStreaming") \
.config("/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/spark-streaming-kafka-0-10_2.12-3.0.0.jar") \
.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, n_secs)
kStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-group',
'auto.offset.reset':'latest'})
lines = kStream.map(lambda x: x[1])
words = lines.flatmap(lambda line: line.split(" "))
print(words)
ssc.start()
time.sleep(100)
ssc.stop(stopSparkContext=True,stopGraceFully=True)
In the above code, I have added the jar file using SparkSession.config() method. After creating DStream, I am trying to receive data from kafka using KafkaUtils.createDirectStream() by providing topic name, bootstrap servers and so on. After that, I convert the data into rdd and print the result. This is the overall flow of my work. At first, I was executing kafka producer code in java and it generates some data and consumed by kafka consumer. Upto this, it was working properly. While executing the spark streaming code in python, it shows some error like this
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Py4JError Traceback (most recent call last)
<ipython-input-17-873ece723182> in <module>
36 'bootstrap.servers':'localhost:9092',
37 'group.id':'test-group',
---> 38 'auto.offset.reset':'latest'})
39
40 lines = kStream.map(lambda x: x[1])
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
144 func = funcWithoutMessageHandler
145 jstream = helper.createDirectStreamWithoutMessageHandler(
--> 146 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
147 else:
148 ser = AutoBatchedSerializer(PickleSerializer())
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
334 raise Py4JError(
335 "An error occurred while calling {0}{1}{2}".
--> 336 format(target_id, ".", name))
337 else:
338 type = answer[1]
Py4JError: An error occurred while calling o270.createDirectStreamWithoutMessageHandler
Please anyone help me to get out of this issue...
Upvotes: 1
Views: 720
Reputation: 780
There are few things I can see from code itself:
--jar <jar-file-path>
.kStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-group',
'auto.offset.reset':'latest'})
kStream.pprint()
ssc.start()
# stream will run for 50 sec
ssc.awaitTerminationOrTimeout(50)
ssc.stop()
sc.stop()
Upvotes: 0