Reputation: 3283
I have a stream like
+------+-------------------+------+
|group | time| label|
+------+-------------------+------+
| a|2020-01-01 10:49:00|red |
| a|2020-01-01 10:51:00|yellow|
| a|2020-01-01 12:49:00|blue |
| b|2020-01-01 12:44:00|red |
| b|2020-01-01 12:46:00|blue |
| c|2020-01-01 12:46:00|green |
+------+-------------------+------+
I would like to use spark streaming to keep only, for each group, the most recent time.
With a spark dataframe I would use a window function as
val window = {
Window
.partitionBy("group")
.orderBy($"time".desc)
}
df
.withColumn("rn",row_number.over(window))
.filter("rn = 1")
.drop("rn")
.show()
or alternatively
df
.orderBy($"time".desc)
.dropDuplicates("group")
What it is the best way to do the same operation in spark streaming and how to save the results in a way that only the most updated solution is stored?
Update:
I am trying to keep only one row per group with the most recent time. Is it possible to use a stateful transformation with mapGroupsWithState
for the purpose?
Upvotes: 0
Views: 1902
Reputation: 18475
When doing an aggregation in Spark Structured Streaming you need to first define a Window
. Through this Window Operation you are defining at which time interval your aggregation ("maximum time, grouped by column 'group'") will be calculated.
Let's say you are planning to get the maximum time in a 5 minute (non-sliding) window, then you would define:
val df = spark.readStream
.format("kafka")
[...]
.selectExpr("CAST(value AS STRING) as group", "timestamp")
val dfGrouped = df
.select(
col("group"),
col("timestamp"),
unix_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss").alias("time_unix"))
.groupBy(col("group"), window($"timestamp", "5 minutes"))
.agg(max("time_unix").alias("max_time_unix"))
.withColumn("time", col("max_time_unix").cast(TimestampType))
.drop("window", "max_time_unix")
It is important to note, that aggregations on the maximum value only work for numeric value, hence, the conversion to a unix_timestamp
as shown above.
According to the Output Modes, you can then choose the update
mode to only get the updates on the groups. Make sure that your output sink (such as console or a database) is capable of handling updates and not creating duplicates.
Upvotes: 2