Reputation: 25
I would like to know if it is possible using pyspark if I can calculate the time difference of a dataset by group. For example I have
CODE1 | CODE2 | TIME
00001 | AAA | 2019-01-01 14:00:00
00001 | AAA | 2019-01-01 14:05:00
00001 | AAA | 2019-01-01 14:10:00
00001 | BBB | 2019-01-01 14:15:00
00001 | BBB | 2019-01-01 14:20:00
00001 | AAA | 2019-01-01 14:25:00
00001 | AAA | 2019-01-01 14:30:00
What I would like is something like
CODE1 | CODE2 | TIME_DIFF
00001 | AAA | 10 MINUTES
00001 | BBB | 5 MINUTES
00001 | AAA | 5 MINUTES
The time difference is from the last record to the first one in the same category. I have already sorted the information by time. Is it possible?
Upvotes: 0
Views: 288
Reputation: 1588
I have coded it with a pretty normal & decent approach. However, the below can be optimized utilizing more inbuilt functions available in spark.
>>> df.show()
+-----+-----+-------------------+
|CODE1|CODE2| TIME|
+-----+-----+-------------------+
| 1| AAA|2019-01-01 14:00:00|
| 1| AAA|2019-01-01 14:05:00|
| 1| AAA|2019-01-01 14:10:00|
| 1| BBB|2019-01-01 14:15:00|
| 1| BBB|2019-01-01 14:20:00|
| 1| AAA|2019-01-01 14:25:00|
| 1| AAA|2019-01-01 14:30:00|
+-----+-----+-------------------+
>>> df.printSchema()
root
|-- CODE1: long (nullable = true)
|-- CODE2: string (nullable = true)
|-- TIME: string (nullable = true)
>>> from pyspark.sql import functions as F, Window
>>> win = Window.partitionBy(F.lit(0)).orderBy('TIME')
#batch_order column is to group CODE2 as per the ordered timestamp
>>> df_1=df.withColumn('prev_batch', F.lag('CODE2').over(win)) \
... .withColumn('flag', F.when(F.col('CODE2') == F.col('prev_batch'),0).otherwise(1)) \
... .withColumn('batch_order', F.sum('flag').over(win)) \
... .drop('prev_batch', 'flag') \
... .sort('TIME')
>>> df_1.show()
+-----+-----+-------------------+-----------+
|CODE1|CODE2| TIME|batch_order|
+-----+-----+-------------------+-----------+
| 1| AAA|2019-01-01 14:00:00| 1|
| 1| AAA|2019-01-01 14:05:00| 1|
| 1| AAA|2019-01-01 14:10:00| 1|
| 1| BBB|2019-01-01 14:15:00| 2|
| 1| BBB|2019-01-01 14:20:00| 2|
| 1| AAA|2019-01-01 14:25:00| 3|
| 1| AAA|2019-01-01 14:30:00| 3|
+-----+-----+-------------------+-----------+
#Extract min and max timestamps for each group
>>> df_max=df_1.groupBy([df_1.batch_order,df_1.CODE2]).agg(F.max("TIME").alias("mx"))
>>> df_min=df_1.groupBy([df_1.batch_order,df_1.CODE2]).agg(F.min("TIME").alias("mn"))
>>> df_max.show()
+-----------+-----+-------------------+
|batch_order|CODE2| mx|
+-----------+-----+-------------------+
| 1| AAA|2019-01-01 14:10:00|
| 2| BBB|2019-01-01 14:20:00|
| 3| AAA|2019-01-01 14:30:00|
+-----------+-----+-------------------+
>>> df_min.show()
+-----------+-----+-------------------+
|batch_order|CODE2| mn|
+-----------+-----+-------------------+
| 1| AAA|2019-01-01 14:00:00|
| 2| BBB|2019-01-01 14:15:00|
| 3| AAA|2019-01-01 14:25:00|
+-----------+-----+-------------------+
#join on batch_order
>>> df_joined=df_max.join(df_min,df_max.batch_order==df_min.batch_order)
>>> df_joined.show()
+-----------+-----+-------------------+-----------+-----+-------------------+
|batch_order|CODE2| mx|batch_order|CODE2| mn|
+-----------+-----+-------------------+-----------+-----+-------------------+
| 1| AAA|2019-01-01 14:10:00| 1| AAA|2019-01-01 14:00:00|
| 3| AAA|2019-01-01 14:30:00| 3| AAA|2019-01-01 14:25:00|
| 2| BBB|2019-01-01 14:20:00| 2| BBB|2019-01-01 14:15:00|
+-----------+-----+-------------------+-----------+-----+-------------------+
>>> from pyspark.sql.functions import unix_timestamp
>>> from pyspark.sql.types import IntegerType
#difference between the max and min timestamp
>>> df_joined.withColumn("diff",((unix_timestamp(df_joined.mx, 'yyyy-MM-dd HH:mm:ss')-unix_timestamp(df_joined.mn, 'yyyy-MM-dd HH:mm:ss'))/60).cast(IntegerType())).show()
+-----------+-----+-------------------+-------------------+----+
|batch_order|CODE2| mx| mn|diff|
+-----------+-----+-------------------+-------------------+----+
| 1| AAA|2019-01-01 14:10:00|2019-01-01 14:00:00| 10|
| 3| AAA|2019-01-01 14:30:00|2019-01-01 14:25:00| 5|
| 2| BBB|2019-01-01 14:20:00|2019-01-01 14:15:00| 5|
+-----------+-----+-------------------+-------------------+----+
Upvotes: 1