Ravi Teja
Ravi Teja

Reputation: 55

partition by 24 hours and aggregate using pyspark or panda

I am having a session data for each device as below

time-started: timestamp when device connected

duration (seconds): how much time it connected

packets: how much packets sent

now i need to aggregate(sum) duration and packet for each device partition by 24 hours.

eg.:

For first record,

Device A, time-started at 8thApril 1:53 AM so need to aggregate all the device(A) valid for 24 hours that is up to 9thApril 1:53 AM.

Then next record for device A, should start from 9thApril 4:27 AM so need to aggregate all the device(A) valid for 24 hours that is up to 10thApril 4:27 AM.

and

So on for each device.

enter image description here

expected output

enter image description here

Test data:

dftest = sc.parallelize([['A','2020-04-08T01:53:54.932000','Org1','wifi',60,372717],
                      ['A','2020-04-08T02:40:38.661000','Org1','wifi',194,819040],
                       ['A','2020-04-08T21:45:10.207000','Org1','wifi',8885,3449150],
                        ['A','2020-04-09T00:15:28.838000','Org1','wifi',14770,3572589],
                         ['A','2020-04-09T04:27:33.424000','Org1','remote',0,0],
                          ['A','2020-04-09T04:29:25.189000','Org1','wifi',60,7495],
                           ['A','2020-04-09T04:44:21.397000','Org1','remote',60,553356],
                            ['A','2020-04-09T04:50:40.406000','Org1','wifi',60,662467],
                             ['A','2020-04-10T00:00:50.636000','Org1','remote',0,72],
                              ['A','2020-04-10T04:20:28.831000','Org1','remote',6,497],
                               ['A','2020-04-10T04:31:35.336000','Org1','remote',0,22],
                                ['B','2020-04-08T21:56:58.502000','Org2','remote',0,0],
                                 ['B','2020-04-08T22:01:19.534000','Org2','wifi',0,0],
                                  ['B','2020-04-08T22:10:15.891000','Org2','wifi',60,187891],
                                   ['B','2020-04-08T22:16:41.499000','Org2','wifi',1620,207674],
                                    ['B','2020-04-09T01:55:02.772000','Org2','wifi',360,426232],
                                     ['B','2020-04-09T02:03:32.735000','Org2','wifi',60,374827],
                                      ['B','2020-04-09T02:06:16.509000','Org2','wifi',60,386518],
                                       ['B','2020-04-09T02:13:33.497000','Org2','remote',60,373609],
                                        ['B','2020-04-09T02:17:19.176000','Org2','wifi',133,400417],
                                         ['B','2020-04-10T23:10:15.654000','Org2','remote',0,212],
                                          ['B','2020-04-10T23:10:41.749000','Org2','remote',1,285]
                    ]).toDF(("deviceId","time-started","OrgId","type","duration","packet"))
dftest.show()

Upvotes: 2

Views: 700

Answers (1)

murtihash
murtihash

Reputation: 8410

For your case, your next 24 hour depends on the end of the last one, and the date after that last date therefore we can't express this logic with only window functions. I decoupled the sum computation from pandas(as it would be slow) and used spark in-built functions to get your sum, and pandas udaf basically gives us our desired date groups, and we filter on them to get desired result.

Iteration is the only way to get your 24hour segments, so you can use a simple udf too, but the pandas vectorized udaf allows us to express our logic on small groups(based on id) therefore it should be better. Pandas-udaf(spark2.3+)

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("deviceId").orderBy(F.col("time-started").cast("long")).rangeBetween(Window.currentRow,24*60*60)
df2=df.withColumn("time-started", F.to_timestamp("time-started", "yyyy-MM-dd'T'HH:mm:ss"))\
      .withColumn("time-started-2", F.col("time-started"))\
      .withColumn("duration", F.sum("duration").over(w))\
      .withColumn("packet", F.sum("packet").over(w))

@pandas_udf(df2.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
   start=df1.loc[0, 'time-started']
   for i in range(1, len(df1)):
        if start + pd.Timedelta(days=1)>df1.loc[i,'time-started']:
             df1.loc[i,'time-started']=start
        else:
             start=df1.loc[i,'time-started']    


   return df1
df2.groupby('deviceId').apply(grouped_map)\
.filter(F.col("time-started-2")==F.col("time-started"))\
.drop("time-started-2")\
.orderBy("deviceId")\
.show()

#+--------+-------------------+-----+------+--------+-------+
#|deviceId|       time-started|OrgId|  type|duration| packet|
#+--------+-------------------+-----+------+--------+-------+
#|       A|2020-04-08 01:53:54| Org1|  wifi|   23909|8213496|
#|       A|2020-04-09 04:27:33| Org1|remote|     186|1223887|
#|       A|2020-04-10 04:31:35| Org1|remote|       0|     22|
#|       B|2020-04-08 21:56:58| Org2|remote|    2353|2357168|
#|       B|2020-04-10 23:10:15| Org2|remote|       1|    497|
#+--------+-------------------+-----+------+--------+-------+

You can also take a look at a similar question. The proposed solution there was a scala udf using foldleft feature. I think pandas grouped map udaf would be a better alternative.

Upvotes: 1

Related Questions