Reputation: 15
I have data coming as a stream from kafka topic . In my dataframe I have "Average" column and I want to create new column by performing some calculation on "Average" column.
Currently I have written something like -
rdd_get_chills = df_avg_tmp.rdd.map(lambda line:get_wind_chills(line))
But it's throwing error -
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
Upvotes: 0
Views: 1798
Reputation: 626
you can combine withColumn()
and custom UDF
to add a new column over a streaming DF. The reason you are getting the error is because you are trying to perform batch like operations on a streaming dataframe. If you wish to perform batch operations - you can try using forEachBatch()
functionality of structured streaming - https://docs.databricks.com/structured-streaming/foreach.html
You can try something like below to add a column on streaming Dataframe
import org.apache.spark.sql.functions
incomingStreamDF.withColumn("newlyCalculatedColumnName", UDF_Function_To_Perform_Calculation(col(average))
Upvotes: 1