Reputation: 4477
Been playing around Spark Structured Streaming and mapGroupsWithState
(specifically following the StructuredSessionization example in the Spark source). I want to confirm some limitations I believe exist with mapGroupsWithState
given my use case.
A session for my purposes is a group of uninterrupted activity for a user such that no two chronologically ordered (by event time, not processing time) events are separated by more than some developer-defined duration (30 minutes is common).
An example will help before jumping into code:
{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}
For the stream above, a session is defined with a 30 minute period of inactivity. In a streaming context, we should end up with one session (the second has yet to complete):
[
{
"user_id": "mike",
"startTimestamp": "2018-01-01T00:00:00",
"endTimestamp": "2018-01-01T00:05:00"
}
]
Now consider the following Spark driver program:
import java.sql.Timestamp
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
object StructuredSessionizationV2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[2]")
.appName("StructredSessionizationRedux")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
implicit val ctx = spark.sqlContext
val input = MemoryStream[String]
val EVENT_SCHEMA = new StructType()
.add($"event_time".string)
.add($"user_id".string)
val events = input.toDS()
.select(from_json($"value", EVENT_SCHEMA).alias("json"))
.select($"json.*")
.withColumn("event_time", to_timestamp($"event_time"))
.withWatermark("event_time", "1 hours")
events.printSchema()
val sessionized = events
.groupByKey(row => row.getAs[String]("user_id"))
.mapGroupsWithState[SessionState, SessionOutput](GroupStateTimeout.EventTimeTimeout) {
case (userId: String, events: Iterator[Row], state: GroupState[SessionState]) =>
println(s"state update for user ${userId} (current watermark: ${new Timestamp(state.getCurrentWatermarkMs())})")
if (state.hasTimedOut) {
println(s"User ${userId} has timed out, sending final output.")
val finalOutput = SessionOutput(
userId = userId,
startTimestampMs = state.get.startTimestampMs,
endTimestampMs = state.get.endTimestampMs,
durationMs = state.get.durationMs,
expired = true
)
// Drop this user's state
state.remove()
finalOutput
} else {
val timestamps = events.map(_.getAs[Timestamp]("event_time").getTime).toSeq
println(s"User ${userId} has new events (min: ${new Timestamp(timestamps.min)}, max: ${new Timestamp(timestamps.max)}).")
val newState = if (state.exists) {
println(s"User ${userId} has existing state.")
val oldState = state.get
SessionState(
startTimestampMs = math.min(oldState.startTimestampMs, timestamps.min),
endTimestampMs = math.max(oldState.endTimestampMs, timestamps.max)
)
} else {
println(s"User ${userId} has no existing state.")
SessionState(
startTimestampMs = timestamps.min,
endTimestampMs = timestamps.max
)
}
state.update(newState)
state.setTimeoutTimestamp(newState.endTimestampMs, "30 minutes")
println(s"User ${userId} state updated. Timeout now set to ${new Timestamp(newState.endTimestampMs + (30 * 60 * 1000))}")
SessionOutput(
userId = userId,
startTimestampMs = state.get.startTimestampMs,
endTimestampMs = state.get.endTimestampMs,
durationMs = state.get.durationMs,
expired = false
)
}
}
val eventsQuery = sessionized
.writeStream
.queryName("events")
.outputMode("update")
.format("console")
.start()
input.addData(
"""{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}""",
"""{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}""",
"""{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}"""
)
input.addData(
"""{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}"""
)
eventsQuery.processAllAvailable()
}
case class SessionState(startTimestampMs: Long, endTimestampMs: Long) {
def durationMs: Long = endTimestampMs - startTimestampMs
}
case class SessionOutput(userId: String, startTimestampMs: Long, endTimestampMs: Long, durationMs: Long, expired: Boolean)
}
Output of that program is:
root
|-- event_time: timestamp (nullable = true)
|-- user_id: string (nullable = true)
state update for user mike (current watermark: 1969-12-31 19:00:00.0)
User mike has new events (min: 2018-01-01 00:00:00.0, max: 2018-01-01 00:05:00.0).
User mike has no existing state.
User mike state updated. Timeout now set to 2018-01-01 00:35:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
| mike| 1514782800000| 1514783100000| 300000| false|
+------+----------------+--------------+----------+-------+
state update for user mike (current watermark: 2017-12-31 23:05:00.0)
User mike has new events (min: 2018-01-01 00:45:00.0, max: 2018-01-01 00:45:00.0).
User mike has existing state.
User mike state updated. Timeout now set to 2018-01-01 01:15:00.0
-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
| mike| 1514782800000| 1514785500000| 2700000| false|
+------+----------------+--------------+----------+-------+
Given my session definition, the single event in the second batch should trigger an expiry of session state and thus a new session. However, since the watermark (2017-12-31 23:05:00.0
) has not passed the state's timeout (2018-01-01 00:35:00.0
), state isn't expired and the event is erroneously added to the existing session despite the fact that more than 30 minutes have passed since the latest timestamp in the previous batch.
I think the only way for session state expiration to work as I'm hoping is if enough events from different users were received within the batch to advance the watermark past the state timeout for mike
.
I suppose one could also mess with the stream's watermark, but I can't think of how I'd do that to accomplish my use case.
Is this accurate? Am I missing anything in how to properly do event time-based sessionization in Spark?
Upvotes: 12
Views: 2548
Reputation: 1708
EDIT: I think I need to answer specific point of origin question instead of providing full resolution.
To add Arun’s answer, state function of map/flatMapGroupsWithState is being called with events first, and then being called with timed out states. Based on how it works, your code is going to reset the timeout while the state should be timed out in this batch.
So while you can leverage timeout feature to call state func even the events don’t contain such key, you still need to deal with current watermark manually. That’s why I set a timeout to earliest sessions’ session end timestamp, and handle all evictions once it is being called.
——
You can refer below code block to see how to achieve session window with event time & watermark via flatMapGroupsWithState.
NOTE: I didn't clean the code, and try to support both output modes, so once you decide the output mode, you can remove unrelated codes to make it simpler.
EDIT2: I had wrong assumption regarding flatMapGroupsWithState, events are not guaranteed to be sorted.
Just updated the code: https://gist.github.com/HeartSaVioR/9a3aeeef0f1d8ee97516743308b14cd6#file-eventtimesessionwindowimplementationviaflatmapgroupswithstate-scala-L32-L189
Upvotes: 0
Reputation: 368
The implementation you have provided does not seem to work if the watermark interval is greater than session gap duration.
For the logic you have shown to work, you need to set the watermark interval to < 30 mins.
If you really want the watermark interval to be independent of (or more than) the session gap duration, you need to wait until the watermark passes (watermark + gap) to expire the state. The merging logic seems to blindly merge the windows. This should take the gap duration into account before merging.
Upvotes: 1