Reputation: 1428
I get data from kafka using pyspark streaming, and the result is a dataframe, when I transform dataframe to rdd, it went wrong:
Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
the right version code:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()
q.awaitTermination()
this is the wrong version code:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()
q.awaitTermination()
Why it cannot convert dataframe to rdd? and how can I do when I want to transform dataframe to rdd in pyspark streaming?
Upvotes: 3
Views: 5673
Reputation: 31
If your spark version is 2.4.0 and above then u can use below alternative to play around with each row of your dataframe.
query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode("update").start()
ssc.start()
ssc.awaitTermination()
Upvotes: 3
Reputation: 4133
To perform specific actions over your Dataframe fields you can use UDF functions or even you can create your Spark Custom Transformers. But there are some Dataframe operations that are not supported like transforming to RDD.
Upvotes: 1
Reputation: 11
structured streaming is running on the spark-sql enginer.Conversion of dataframe or dataset to RDD is not supported.
Upvotes: 1
Reputation: 18013
This RDD aspect is simply NOT supported. RDDs are legacy and Spark Structured Streaming is DF/DS based. Common abstraction whether streaming or batch.
Upvotes: 3