Reputation: 2708
I'm receiving DStream from Kafka and I want to group all messages in some sliding window by keys.
The point is that this window need to be based on timestamps provided in each message (separate field):
Message structure
--------------------------
key1, ..., ..., 1557678233
key1, ..., ..., 1557678234
key2, ..., ..., 1557678235
So, I want to consider messages where for each key timestamp of the first message
- timestamp of the last message
<= 5 minutes
As I can see from this question, that's not feasible since Spark counts only system time for events. The guy over there suggests using updateStateByKey
, which is not very clear for me...
Maybe we could achieve this using another approach?
What about including timestamp's differences in combiners
for combineByKey
function with further summation & filtering by durations threshold?
Please add your thoughts on that, or share your solution if you had a chance to face the same problem...
Thanks!
Upvotes: 1
Views: 392
Reputation: 1323
Tested with below sample data and I assume timestamp is in epoch format -
[key1, ..., ..., 1557678233]
[key1, ..., ..., 1557678234]
[key2, ..., ..., 1557678235]
[key2, ..., ..., 1557678240]
[key2, ..., ..., 1557678271]
[key3, ..., ..., 1557678635]
[key3, ..., ..., 1557678636]
[key3, ..., ..., 1557678637]
[key3, ..., ..., 1557678638]
[key3, ..., ..., 1557678999]
//-- create udf to return if record needs to be processed or rejected
scala> spark.udf.register("recordStatusUDF", (ts:String) => {
| val ts_array = ts.split(",",-1)
| if ((ts_array.max.trim.toLong - ts_array.min.trim.toLong) <= 300) {
| "process"
| }
| else { "reject" }
| })
res83: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
//-- Create schema
scala> val schema = StructType(Seq(StructField("key", StringType, true),StructField("col2", StringType, true),StructField("col3", StringType, true),StructField("epoch_ts", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(key,StringType,true), StructField(col2,StringType,true), StructField(col3,StringType,true), StructField(epoch_ts,StringType,true))
//-- Create Dataframe
scala> spark.createDataFrame(rdd,schema).createOrReplaceTempView("kafka_messages")
scala> spark.sql(s""" select x.key, recordStatusUDF(x.ts) as action_ind from ( select key, concat_ws(",", collect_list(epoch_ts)) as ts from kafka_messages group by key)x """).createOrReplaceTempView("action")
scala> val result = spark.sql(s""" select km.* from kafka_messages km inner join action ac on km.key = ac.key and ac.action_ind = "process" """)
result: org.apache.spark.sql.DataFrame = [key: string, col2: string ... 2 more fields]
scala> result.show(false)
+----+----+----+-----------+
|key |col2|col3|epoch_ts |
+----+----+----+-----------+
|key1| ...| ...| 1557678233|
|key1| ...| ...| 1557678234|
|key2| ...| ...| 1557678235|
|key2| ...| ...| 1557678240|
|key2| ...| ...| 1557678271|
+----+----+----+-----------+
you can use above code on each rdd (kafka messages). Hope this is helpful.
Upvotes: 0
Reputation: 21
Is it possible? Without a doubt. Apache Beam, which among others, provides Apache Spark backend can easily handle such operations.
However that's definitely not something you want to implement yourself, unless you have significant development resources and a lot of know-how at your disposal. And if you had, you probably wouldn't ask this question in the first place.
Handling late events, out-of-order events, and recovering from node failures can be tricky at best, with larger number of edge cases.
Furthermore it would be obsolete before you actually implement it - DStream
is already considered a legacy API, and is likely to reach it's end-of-life sooner rather than later. At the same time Structured Streaming can already handle event time windowing out-of-the-box.
Upvotes: 2