Reputation: 95
I have the following dataframe as input
+----+-------------------+
| id | date|
+----+-------------------+
| A|2016-03-11 09:00:00|
| B|2016-03-11 09:00:07|
| C|2016-03-11 09:00:18|
| D|2016-03-11 09:00:21|
| E|2016-03-11 09:00:39|
| F|2016-03-11 09:00:44|
| G|2016-03-11 09:00:49|
+----+-------------------+
and I would like to partition it into 10 seconds window that ignore larger intervals of time than the window itself (basically as if restarting the counter).
Here's an example of the expected output:
+----+-------------------+-----+
| id | date|group|
+----+-------------------+-----+
| A|2016-03-11 09:00:00| 1 |
| B|2016-03-11 09:00:07| 1 |
| C|2016-03-11 09:00:18| 2 |
| D|2016-03-11 09:00:21| 2 |
| E|2016-03-11 09:00:39| 3 |
| F|2016-03-11 09:00:44| 3 |
| G|2016-03-11 09:00:49| 4 |
+----+-------------------+-----+
As opposed to a fixed timeslicing which would group them like so:
+----+-------------------+-----+
| id | date|group|
+----+-------------------+-----+
| A|2016-03-11 09:00:00| 1 |
| B|2016-03-11 09:00:07| 1 |
| C|2016-03-11 09:00:18| 2 |
| D|2016-03-11 09:00:21| 3 |
| E|2016-03-11 09:00:39| 4 |
| F|2016-03-11 09:00:44| 5 |
| G|2016-03-11 09:00:49| 5 |
+----+-------------------+-----+
I tried looking for solutions with the functions rowsBetween
and rangeBetween
, or with some variations of time difference, but I couldn't find the proper solution.
It's probaly due to the fact that I don't know the proper terminology for this kind of windowing (it's neither tumbling nor rolling).
The closest I got was using the window function, but there were two problems:
Here's the code I tried (which used the original df with an additional 'val' column with random values):
w = df.groupBy(window("date", "10 seconds")).agg(sum("val").alias("sum"))
w.select(w.window.start.cast("string").alias("start"),
w.window.end.cast("string").alias("end"), "sum").collect()
Any help would be greatly appreciated.
Upvotes: 2
Views: 668
Reputation: 3686
You can do it, but it is expensive to do so due to needing multiple window functions, some without partitioning (you lose the benefit of spark when doing so). To achieve it regardless:
I've left in the intermediate columns for clarity.
from pyspark.sql.functions import col, lag, sum, when, min, floor, dense_rank
data = [
('B', '2016-03-11 09:00:07'),
('C', '2016-03-11 09:00:18'),
('A', '2016-03-11 09:00:00'),
('D', '2016-03-11 09:00:21'),
('E', '2016-03-11 09:00:39'),
('F', '2016-03-11 09:00:44'),
('G', '2016-03-11 09:00:49')
]
df = spark.createDataFrame(data, ['id', 'date'])
df = df.withColumn('date', df['date'].astype('timestamp'))
w = Window.orderBy('date')
w2 = Window.partitionBy('group1').orderBy('date')
w3 = Window.orderBy('group1', 'group2')
df = (df
.withColumn('diff', col('date').astype('long') - lag(col('date').astype('long')).over(w))
.withColumn('diff2', when(col('diff').isNull(), 0).otherwise(col('diff')))
.withColumn('diff3', (col('diff2') > 10).astype('int'))
.withColumn('group1', sum('diff3').over(w))
.withColumn('group2', floor((col('date').astype('long') - min(col('date').astype('long')).over(w2)) / 10))
.withColumn('group3', dense_rank().over(w3))
)
# output
+---+-------------------+----+-----+-----+------+------+------+
|id |date |diff|diff2|diff3|group1|group2|group3|
+---+-------------------+----+-----+-----+------+------+------+
|A |2016-03-11 09:00:00|null|0 |0 |0 |0 |1 |
|B |2016-03-11 09:00:07|7 |7 |0 |0 |0 |1 |
|C |2016-03-11 09:00:18|11 |11 |1 |1 |0 |2 |
|D |2016-03-11 09:00:21|3 |3 |0 |1 |0 |2 |
|E |2016-03-11 09:00:39|18 |18 |1 |2 |0 |3 |
|F |2016-03-11 09:00:44|5 |5 |0 |2 |0 |3 |
|G |2016-03-11 09:00:49|5 |5 |0 |2 |1 |4 |
+---+-------------------+----+-----+-----+------+------+------+
Upvotes: 1