sachin jagtap
sachin jagtap

Reputation: 15

How to add new column to streaming data frame in PySpark

I have data coming as a stream from kafka topic . In my dataframe I have "Average" column and I want to create new column by performing some calculation on "Average" column.

Currently I have written something like -

rdd_get_chills = df_avg_tmp.rdd.map(lambda line:get_wind_chills(line))

But it's throwing error -

pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

Upvotes: 0

Views: 1798

Answers (1)

Shane
Shane

Reputation: 626

you can combine withColumn() and custom UDF to add a new column over a streaming DF. The reason you are getting the error is because you are trying to perform batch like operations on a streaming dataframe. If you wish to perform batch operations - you can try using forEachBatch() functionality of structured streaming - https://docs.databricks.com/structured-streaming/foreach.html

You can try something like below to add a column on streaming Dataframe

import org.apache.spark.sql.functions
incomingStreamDF.withColumn("newlyCalculatedColumnName", UDF_Function_To_Perform_Calculation(col(average))

Upvotes: 1

Related Questions