Developer87
Developer87

Reputation: 2708

Constructing window based on message timestamps in Spark DStream

I'm receiving DStream from Kafka and I want to group all messages in some sliding window by keys.

The point is that this window need to be based on timestamps provided in each message (separate field):

Message structure
--------------------------
key1, ..., ..., 1557678233
key1, ..., ..., 1557678234 
key2, ..., ..., 1557678235 

So, I want to consider messages where for each key timestamp of the first message - timestamp of the last message <= 5 minutes

As I can see from this question, that's not feasible since Spark counts only system time for events. The guy over there suggests using updateStateByKey, which is not very clear for me...

Maybe we could achieve this using another approach?

What about including timestamp's differences in combiners for combineByKey function with further summation & filtering by durations threshold?

Please add your thoughts on that, or share your solution if you had a chance to face the same problem...

Thanks!

Upvotes: 1

Views: 392

Answers (2)

Ajay Ahuja
Ajay Ahuja

Reputation: 1323

Tested with below sample data and I assume timestamp is in epoch format -

[key1, ..., ..., 1557678233]
[key1, ..., ..., 1557678234]
[key2, ..., ..., 1557678235]
[key2, ..., ..., 1557678240]
[key2, ..., ..., 1557678271]
[key3, ..., ..., 1557678635]
[key3, ..., ..., 1557678636]
[key3, ..., ..., 1557678637]
[key3, ..., ..., 1557678638]
[key3, ..., ..., 1557678999]

//-- create udf to return if record needs to be processed or rejected

scala> spark.udf.register("recordStatusUDF", (ts:String) => {
     |     val ts_array = ts.split(",",-1)
     |     if ((ts_array.max.trim.toLong - ts_array.min.trim.toLong) <= 300) {
     |        "process"
     |     }
     |     else { "reject" }
     | })
res83: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

//-- Create schema

scala> val schema = StructType(Seq(StructField("key", StringType, true),StructField("col2", StringType, true),StructField("col3", StringType, true),StructField("epoch_ts", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(key,StringType,true), StructField(col2,StringType,true), StructField(col3,StringType,true), StructField(epoch_ts,StringType,true))

//-- Create Dataframe

scala> spark.createDataFrame(rdd,schema).createOrReplaceTempView("kafka_messages")


scala> spark.sql(s""" select x.key, recordStatusUDF(x.ts) as action_ind from ( select key, concat_ws(",", collect_list(epoch_ts)) as ts from kafka_messages group by key)x """).createOrReplaceTempView("action")

scala> val result = spark.sql(s""" select km.* from kafka_messages km inner join action ac on km.key = ac.key and ac.action_ind = "process" """)
result: org.apache.spark.sql.DataFrame = [key: string, col2: string ... 2 more fields]

scala> result.show(false)
+----+----+----+-----------+
|key |col2|col3|epoch_ts   |
+----+----+----+-----------+
|key1| ...| ...| 1557678233|
|key1| ...| ...| 1557678234|
|key2| ...| ...| 1557678235|
|key2| ...| ...| 1557678240|
|key2| ...| ...| 1557678271|
+----+----+----+-----------+

you can use above code on each rdd (kafka messages). Hope this is helpful.

Upvotes: 0

user11683567
user11683567

Reputation: 21

Is it possible? Without a doubt. Apache Beam, which among others, provides Apache Spark backend can easily handle such operations.

However that's definitely not something you want to implement yourself, unless you have significant development resources and a lot of know-how at your disposal. And if you had, you probably wouldn't ask this question in the first place.

Handling late events, out-of-order events, and recovering from node failures can be tricky at best, with larger number of edge cases.

Furthermore it would be obsolete before you actually implement it - DStream is already considered a legacy API, and is likely to reach it's end-of-life sooner rather than later. At the same time Structured Streaming can already handle event time windowing out-of-the-box.

Upvotes: 2

Related Questions