Reputation: 11
I am currently running into the issue where I want to use a window and sliding interval on my csv and for each window perform data aggregation to get the most common category. However I do not have a timestamp and I want to perform the window sliding on the index column. Can anyone point me in the right direction on how to use windows + sliding intervals on the index?
In short i want to create windows+intervals over the index column.
Currently I have something like this:
schema = StructType().add("index", "string").add(
"Category", "integer")
dataframe = spark \
.readStream \
.option("sep", ",") \
.schema(schema) \
.csv("./tmp/input")
# TODO perform Window + sliding interval on dataframe, then perform aggregation per window
aggr = dataframe.groupBy("Category").count().orderBy("count", ascending=False).limit(3)
query = aggr \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Upvotes: 0
Views: 954
Reputation: 51
For aggregation data on per window basis you can use window function from pyspark.sql.functions package.
For time interval you need to add a timestamp column in your dataframe.
newDf = csvFile.withColumn("TimeStamp", current_timestamp())
This code adds the current time in the dataframe as the data is read from the csv.
trimmedDf2 = newDf.groupBy(window(col("TimeStamp"), "5 seconds")).agg(sum("value")).select("window.start", "window.end", "sum(value)")
display(trimmedDf2)
The above code sums up the value columns and group them in the 5 second timestamp window.
Here is the Output of the code
Weekly Aggregation using Windows Function in Spark
You can also use the above link for reference.
Upvotes: 1