Mike Sukmanowsky
Mike Sukmanowsky

Reputation: 4477

Is proper event-time sessionization possible with Spark Structured Streaming?

Been playing around Spark Structured Streaming and mapGroupsWithState (specifically following the StructuredSessionization example in the Spark source). I want to confirm some limitations I believe exist with mapGroupsWithState given my use case.

A session for my purposes is a group of uninterrupted activity for a user such that no two chronologically ordered (by event time, not processing time) events are separated by more than some developer-defined duration (30 minutes is common).

An example will help before jumping into code:

{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}

For the stream above, a session is defined with a 30 minute period of inactivity. In a streaming context, we should end up with one session (the second has yet to complete):

[
  {
    "user_id": "mike",
    "startTimestamp": "2018-01-01T00:00:00",
    "endTimestamp": "2018-01-01T00:05:00"
  }
]

Now consider the following Spark driver program:

import java.sql.Timestamp

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

object StructuredSessionizationV2 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("StructredSessionizationRedux")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    implicit val ctx = spark.sqlContext
    val input = MemoryStream[String]

    val EVENT_SCHEMA = new StructType()
      .add($"event_time".string)
      .add($"user_id".string)

    val events = input.toDS()
      .select(from_json($"value", EVENT_SCHEMA).alias("json"))
      .select($"json.*")
      .withColumn("event_time", to_timestamp($"event_time"))
      .withWatermark("event_time", "1 hours")
    events.printSchema()

    val sessionized = events
      .groupByKey(row => row.getAs[String]("user_id"))
      .mapGroupsWithState[SessionState, SessionOutput](GroupStateTimeout.EventTimeTimeout) {
      case (userId: String, events: Iterator[Row], state: GroupState[SessionState]) =>
        println(s"state update for user ${userId} (current watermark: ${new Timestamp(state.getCurrentWatermarkMs())})")
        if (state.hasTimedOut) {
          println(s"User ${userId} has timed out, sending final output.")
          val finalOutput = SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = true
          )
          // Drop this user's state
          state.remove()
          finalOutput
        } else {
          val timestamps = events.map(_.getAs[Timestamp]("event_time").getTime).toSeq
          println(s"User ${userId} has new events (min: ${new Timestamp(timestamps.min)}, max: ${new Timestamp(timestamps.max)}).")
          val newState = if (state.exists) {
            println(s"User ${userId} has existing state.")
            val oldState = state.get
            SessionState(
              startTimestampMs = math.min(oldState.startTimestampMs, timestamps.min),
              endTimestampMs = math.max(oldState.endTimestampMs, timestamps.max)
            )
          } else {
            println(s"User ${userId} has no existing state.")
            SessionState(
              startTimestampMs = timestamps.min,
              endTimestampMs = timestamps.max
            )
          }
          state.update(newState)
          state.setTimeoutTimestamp(newState.endTimestampMs, "30 minutes")
          println(s"User ${userId} state updated. Timeout now set to ${new Timestamp(newState.endTimestampMs + (30 * 60 * 1000))}")
          SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = false
          )
        }
      }

    val eventsQuery = sessionized
      .writeStream
      .queryName("events")
      .outputMode("update")
      .format("console")
      .start()

    input.addData(
      """{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}"""
    )
    input.addData(
      """{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}"""
    )
    eventsQuery.processAllAvailable()
  }

  case class SessionState(startTimestampMs: Long, endTimestampMs: Long) {
    def durationMs: Long = endTimestampMs - startTimestampMs
  }

  case class SessionOutput(userId: String, startTimestampMs: Long, endTimestampMs: Long, durationMs: Long, expired: Boolean)
}

Output of that program is:

root
 |-- event_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)

state update for user mike (current watermark: 1969-12-31 19:00:00.0)
User mike has new events (min: 2018-01-01 00:00:00.0, max: 2018-01-01 00:05:00.0).
User mike has no existing state.
User mike state updated. Timeout now set to 2018-01-01 00:35:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514783100000|    300000|  false|
+------+----------------+--------------+----------+-------+

state update for user mike (current watermark: 2017-12-31 23:05:00.0)
User mike has new events (min: 2018-01-01 00:45:00.0, max: 2018-01-01 00:45:00.0).
User mike has existing state.
User mike state updated. Timeout now set to 2018-01-01 01:15:00.0
-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514785500000|   2700000|  false|
+------+----------------+--------------+----------+-------+

Given my session definition, the single event in the second batch should trigger an expiry of session state and thus a new session. However, since the watermark (2017-12-31 23:05:00.0) has not passed the state's timeout (2018-01-01 00:35:00.0), state isn't expired and the event is erroneously added to the existing session despite the fact that more than 30 minutes have passed since the latest timestamp in the previous batch.

I think the only way for session state expiration to work as I'm hoping is if enough events from different users were received within the batch to advance the watermark past the state timeout for mike.

I suppose one could also mess with the stream's watermark, but I can't think of how I'd do that to accomplish my use case.

Is this accurate? Am I missing anything in how to properly do event time-based sessionization in Spark?

Upvotes: 12

Views: 2548

Answers (3)

Jason Heo
Jason Heo

Reputation: 10236

As of Spark 3.2.0, Spark supports Session window natively.

https://databricks.com/blog/2021/10/12/native-support-of-session-window-in-spark-structured-streaming.html

Upvotes: 0

Jungtaek Lim
Jungtaek Lim

Reputation: 1708

EDIT: I think I need to answer specific point of origin question instead of providing full resolution.

To add Arun’s answer, state function of map/flatMapGroupsWithState is being called with events first, and then being called with timed out states. Based on how it works, your code is going to reset the timeout while the state should be timed out in this batch.

So while you can leverage timeout feature to call state func even the events don’t contain such key, you still need to deal with current watermark manually. That’s why I set a timeout to earliest sessions’ session end timestamp, and handle all evictions once it is being called.

——

You can refer below code block to see how to achieve session window with event time & watermark via flatMapGroupsWithState.

NOTE: I didn't clean the code, and try to support both output modes, so once you decide the output mode, you can remove unrelated codes to make it simpler.

EDIT2: I had wrong assumption regarding flatMapGroupsWithState, events are not guaranteed to be sorted.

Just updated the code: https://gist.github.com/HeartSaVioR/9a3aeeef0f1d8ee97516743308b14cd6#file-eventtimesessionwindowimplementationviaflatmapgroupswithstate-scala-L32-L189

Upvotes: 0

arunmahadevan
arunmahadevan

Reputation: 368

The implementation you have provided does not seem to work if the watermark interval is greater than session gap duration.

For the logic you have shown to work, you need to set the watermark interval to < 30 mins.

If you really want the watermark interval to be independent of (or more than) the session gap duration, you need to wait until the watermark passes (watermark + gap) to expire the state. The merging logic seems to blindly merge the windows. This should take the gap duration into account before merging.

Upvotes: 1

Related Questions