Justin van Dongen
Justin van Dongen

Reputation: 11

Pyspark data aggregation with Window and sliding interval on index

I am currently running into the issue where I want to use a window and sliding interval on my csv and for each window perform data aggregation to get the most common category. However I do not have a timestamp and I want to perform the window sliding on the index column. Can anyone point me in the right direction on how to use windows + sliding intervals on the index?

In short i want to create windows+intervals over the index column.

Currently I have something like this:

schema = StructType().add("index", "string").add(
    "Category", "integer")
                                                                             
dataframe = spark \
    .readStream \
    .option("sep", ",") \
    .schema(schema) \
    .csv("./tmp/input")

# TODO perform Window + sliding interval on dataframe, then perform aggregation per window
aggr = dataframe.groupBy("Category").count().orderBy("count", ascending=False).limit(3)

query = aggr \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

Upvotes: 0

Views: 954

Answers (1)

Aman Lakhani
Aman Lakhani

Reputation: 51

For aggregation data on per window basis you can use window function from pyspark.sql.functions package.

For time interval you need to add a timestamp column in your dataframe.

newDf = csvFile.withColumn("TimeStamp", current_timestamp())

This code adds the current time in the dataframe as the data is read from the csv.

trimmedDf2 = newDf.groupBy(window(col("TimeStamp"), "5 seconds")).agg(sum("value")).select("window.start", "window.end", "sum(value)")

display(trimmedDf2)

The above code sums up the value columns and group them in the 5 second timestamp window.

Here is the Output of the code

Weekly Aggregation using Windows Function in Spark

You can also use the above link for reference.

Upvotes: 1

Related Questions