Reputation: 549
I'm fairly new to Spark and SQL. I am trying to add a column to my df (which I will then save to a Delta table) that gives a unique id to each record/row and increments it every that specific record is updated.
I was trying to do the following:
SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc
somerows1 being the concatenation of several columns in order to form a unique record. I have no particular interest in the records being ordered in a particular form, that's why I chose ORDER BY (SELECT NULL).
I get the following error:
Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;
Does anyone have any idea on how to fix this?
Thanks
Upvotes: 3
Views: 3962
Reputation: 121
I have solved this problem by using the foreachBatch sink on the .writeStream
. This allows you to create a functon where the streaming dataframe is treated like a static/batch dataframe (the function is applied to each micro-batch).
In Scala the code would look something like this:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}
val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {
val windowSpec = Window
.partitionBy("somerows1")
.orderBy(lit(null))
sourceDf
.withColumn("versionid", row_number().over(windowSpec))
//... save the dataframe using: sourceDf.write.save()
}
With the .writeStream
calling your function:
.writeStream
.format("delta")
.foreachBatch(saveWithWindowFunction)
.start()
Upvotes: 7
Reputation: 767
What you're looking for is aggregations over a sliding event-time window. Check the documentation and examples here.
Upvotes: 0