J.Doe
J.Doe

Reputation: 549

Adding a ROW_NUMBER column to a streaming dataframe

I'm fairly new to Spark and SQL. I am trying to add a column to my df (which I will then save to a Delta table) that gives a unique id to each record/row and increments it every that specific record is updated.

I was trying to do the following:

SELECT etc,
CONCAT(somerows1) as id1,
ROW_NUMBER() OVER(PARTITION BY somerows1 ORDER BY (SELECT NULL)) AS versionid
FROM etc

somerows1 being the concatenation of several columns in order to form a unique record. I have no particular interest in the records being ordered in a particular form, that's why I chose ORDER BY (SELECT NULL).

I get the following error:

Error in SQL statement: AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

Does anyone have any idea on how to fix this?

Thanks

Upvotes: 3

Views: 3962

Answers (2)

Vhon Newmahn
Vhon Newmahn

Reputation: 121

I have solved this problem by using the foreachBatch sink on the .writeStream. This allows you to create a functon where the streaming dataframe is treated like a static/batch dataframe (the function is applied to each micro-batch).

In Scala the code would look something like this:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{row_number, lit}

val saveWithWindowFunction = (sourceDf: DataFrame, batchId: Long) => {

  val windowSpec = Window
    .partitionBy("somerows1") 
    .orderBy(lit(null))
  
  sourceDf
    .withColumn("versionid", row_number().over(windowSpec))

//... save the dataframe using: sourceDf.write.save()
}

With the .writeStream calling your function:

  .writeStream
  .format("delta")
  .foreachBatch(saveWithWindowFunction)
  .start()

Upvotes: 7

jayrythium
jayrythium

Reputation: 767

What you're looking for is aggregations over a sliding event-time window. Check the documentation and examples here.

Upvotes: 0

Related Questions