littlely
littlely

Reputation: 1428

How to transform dataframes to rdds in structured streaming?

I get data from kafka using pyspark streaming, and the result is a dataframe, when I transform dataframe to rdd, it went wrong:

Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

the right version code:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

this is the wrong version code:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

Why it cannot convert dataframe to rdd? and how can I do when I want to transform dataframe to rdd in pyspark streaming?

Upvotes: 3

Views: 5673

Answers (4)

Premnath2187
Premnath2187

Reputation: 31

If your spark version is 2.4.0 and above then u can use below alternative to play around with each row of your dataframe.

query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode("update").start()
    ssc.start()
    ssc.awaitTermination()

Upvotes: 3

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

To perform specific actions over your Dataframe fields you can use UDF functions or even you can create your Spark Custom Transformers. But there are some Dataframe operations that are not supported like transforming to RDD.

Upvotes: 1

pengyang_baidu
pengyang_baidu

Reputation: 11

structured streaming is running on the spark-sql enginer.Conversion of dataframe or dataset to RDD is not supported.

Upvotes: 1

Ged
Ged

Reputation: 18013

This RDD aspect is simply NOT supported. RDDs are legacy and Spark Structured Streaming is DF/DS based. Common abstraction whether streaming or batch.

Upvotes: 3

Related Questions