Reputation: 4677
In Spark Structured Streaming, we can do window operations on event time with groupBy
like:
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
Does groupByKey
also supports window operations?
Thanks.
Upvotes: 1
Views: 1282
Reputation: 945
It is possible to write a helper function that makes it easier to generate a time-windowing function to give to groupByKey
.
object windowing {
import java.sql.Timestamp
import java.time.Instant
/** given:
* a row type R
* a function from R to the Timestamp
* a windowing width in seconds
* return: a function that allows groupByKey to do windowing
*/
def windowBy[R](f:R=>Timestamp, width: Int) = {
val w = width.toLong * 1000L
(row: R) => {
val tsCur = f(row)
val msCur = tsCur.getTime()
val msLB = (msCur / w) * w
val instLB = Instant.ofEpochMilli(msLB)
val instUB = Instant.ofEpochMilli(msLB+w)
(Timestamp.from(instLB), Timestamp.from(instUB))
}
}
}
And in your example, it might be used like this:
case class MyRow(timestamp: Timestamp, word: String)
val windowBy60 = windowing.windowBy[MyRow](_.timestamp, 60)
// count words by time window
words.as[MyRow]
.groupByKey(windowBy60)
.count()
Or counting by (window, word) pairs:
words.as[MyRow]
.groupByKey(row => (windowBy60(row), row.word))
.count()
Upvotes: 2
Reputation: 35249
Yes and no. It cannot be used directly, as it is applicable only to SQL / DataFrame
API, but you can always extend the record with window field:
val dfWithWindow = df.withColumn("window", window(...)))
case class Window(start: java.sql.Timestamp. end: java.sql.Timestamp)
case class MyRecordWithWindow(..., window: Window)
and use it for grouping:
dfWithWindow.as[MyRecordWithWindow].groupByKey(_.window).mapGroups(...)
Upvotes: 1