Reputation: 11
I am kind of stuck, I read a Kafka topic using the spark streaming API in pyspark and am then trying to push it to a sink either a console or another Kafka topic, I am not sure what I am doing wrong but the whole process is somewhat stuck and does not do anything. I checked there are messages on the topic etc. I am able to read and continue if I use a java based consumer but somehow for pyspark not able to consume and output the messages. I put the code in a zeppelin notebook too and the code is below. Appreciate it if someone can please have a quick look and suggest what am i doing wrong
%pyspark
def foreach_function(df, epoch_id):
print("I am here")
#pass
from pyspark.sql.types import StructType,StructField, TimestampType, StringType, IntegerType, DoubleType
from pyspark.sql.functions import *
schema = StructType([
StructField("orderId",StringType(),True),
StructField("quantity",IntegerType(),True),
StructField("order_VALUE",DoubleType(),True),
StructField("sku",StringType(),True),
StructField("sales_DATE",StringType(),True)
])
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "kafka.topic.orders") \
.option("startingOffsets", "latest") \
.load()
df.printSchema()
dataDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
dataDF.printSchema()
orderDF = dataDF.select(from_json(col("value"),schema)).alias("data").select("data.*")
orderDF.printSchema()
orderDF.writeStream.outputMode("append").format("console").option("checkpointLocation", "/test/chkpt").start().awaitTermination()
Error
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
root
|-- from_json(value): struct (nullable = true)
| |-- orderId: string (nullable = true)
| |-- quantity: integer (nullable = true)
| |-- order_VALUE: double (nullable = true)
| |-- sku: string (nullable = true)
| |-- sales_DATE: string (nullable = true)
Fail to execute line 31: orderDF.writeStream.outputMode("append").format("console").option("checkpointLocation", "/test/chkpt").start().awaitTermination()
Traceback (most recent call last):
File "/tmp/1625186594615-0/zeppelin_python.py", line 158, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 31, in <module>
File "/Users/test/software/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/streaming.py", line 103, in awaitTermination
return self._jsq.awaitTermination()
File "/Users/test/software/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/test/software/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 137, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 85d72b5f-f1f5-4ad3-a8b4-cb986576ced2, runId = 229fed09-0c60-4eae-a296-7fbebb46f4d6]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[kafka.topic.orders]]: {"kafka.topic.orders":{"2":34,"1":31,"0":35}}}
Upvotes: 1
Views: 292
Reputation: 11
I was able to make it work, I tried the script outside zeppelin using spark-submit and it worked after I added the commons-pool2 jar. I was able to see the full stacktraace using native spark-submit. Thank you all. org.apache.commons commons-pool2 2.10.0 –
Upvotes: 0
Reputation: 780
Maybe see if this helps first. pySpark Structured Streaming from Kafka does not output to console for debugging
And i would try with just this:
writeStream
.format("console")
.start().awaitTermination()
Also double check if the message are getting produced after you start your consumer(above pyspark job), as you have "latest" flag.
Upvotes: 1