secfree
secfree

Reputation: 4677

Does spark.sql.Dataset.groupByKey support window operations like groupBy?

In Spark Structured Streaming, we can do window operations on event time with groupBy like:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

Does groupByKey also supports window operations?

Thanks.

Upvotes: 1

Views: 1282

Answers (2)

eje
eje

Reputation: 945

It is possible to write a helper function that makes it easier to generate a time-windowing function to give to groupByKey.

object windowing {
    import java.sql.Timestamp
    import java.time.Instant
    /** given:
     * a row type R
     * a function from R to the Timestamp
     * a windowing width in seconds
     * return: a function that allows groupByKey to do windowing
     */
    def windowBy[R](f:R=>Timestamp, width: Int) = {
        val w = width.toLong * 1000L
        (row: R) => {
            val tsCur = f(row)
            val msCur = tsCur.getTime()
            val msLB = (msCur / w) * w
            val instLB = Instant.ofEpochMilli(msLB)
            val instUB = Instant.ofEpochMilli(msLB+w)
            (Timestamp.from(instLB), Timestamp.from(instUB))
        }
    }
}

And in your example, it might be used like this:

case class MyRow(timestamp: Timestamp, word: String)

val windowBy60 = windowing.windowBy[MyRow](_.timestamp, 60)

// count words by time window
words.as[MyRow]
  .groupByKey(windowBy60)
  .count()

Or counting by (window, word) pairs:

words.as[MyRow]
  .groupByKey(row => (windowBy60(row), row.word))
  .count()

Upvotes: 2

Alper t. Turker
Alper t. Turker

Reputation: 35249

Yes and no. It cannot be used directly, as it is applicable only to SQL / DataFrame API, but you can always extend the record with window field:

val dfWithWindow = df.withColumn("window", window(...)))

case class Window(start: java.sql.Timestamp. end: java.sql.Timestamp)
case class MyRecordWithWindow(..., window: Window)

and use it for grouping:

dfWithWindow.as[MyRecordWithWindow].groupByKey(_.window).mapGroups(...)

Upvotes: 1

Related Questions