Galuoises
Galuoises

Reputation: 3283

Spark streaming: keep the most recent value in a group

I have a stream like

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|red   |
|     a|2020-01-01 10:51:00|yellow|
|     a|2020-01-01 12:49:00|blue  |
|     b|2020-01-01 12:44:00|red   |
|     b|2020-01-01 12:46:00|blue  |
|     c|2020-01-01 12:46:00|green |
+------+-------------------+------+

I would like to use spark streaming to keep only, for each group, the most recent time.

With a spark dataframe I would use a window function as

val window = {
    Window
    .partitionBy("group")
    .orderBy($"time".desc)
}

df
.withColumn("rn",row_number.over(window))
.filter("rn = 1")
.drop("rn")
.show()

or alternatively

df
.orderBy($"time".desc)
.dropDuplicates("group")

What it is the best way to do the same operation in spark streaming and how to save the results in a way that only the most updated solution is stored?

Update: I am trying to keep only one row per group with the most recent time. Is it possible to use a stateful transformation with mapGroupsWithState for the purpose?

Upvotes: 0

Views: 1902

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

When doing an aggregation in Spark Structured Streaming you need to first define a Window. Through this Window Operation you are defining at which time interval your aggregation ("maximum time, grouped by column 'group'") will be calculated.

Let's say you are planning to get the maximum time in a 5 minute (non-sliding) window, then you would define:

val df = spark.readStream
  .format("kafka")
  [...]
  .selectExpr("CAST(value AS STRING) as group", "timestamp")

val dfGrouped = df
  .select(
    col("group"),
    col("timestamp"),
    unix_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss").alias("time_unix"))
  .groupBy(col("group"), window($"timestamp", "5 minutes"))
  .agg(max("time_unix").alias("max_time_unix"))
  .withColumn("time", col("max_time_unix").cast(TimestampType))
  .drop("window", "max_time_unix")

It is important to note, that aggregations on the maximum value only work for numeric value, hence, the conversion to a unix_timestamp as shown above.

According to the Output Modes, you can then choose the update mode to only get the updates on the groups. Make sure that your output sink (such as console or a database) is capable of handling updates and not creating duplicates.

Upvotes: 2

Related Questions