Ivan
Ivan

Reputation: 20101

Group by a Pyspark Dataframe by time interval

I have a data frame with timestamps generated for it:

 from pyspark.sql.functions import avg, first

 rdd = sc.parallelize(
[
    (0, "A", 223,"201603_170302", "PORT"), 
    (0, "A", 22,"201602_100302", "PORT"), 
    (0, "A", 422,"201601_114300", "DOCK"), 
    (1,"B", 3213,"201602_121302", "DOCK")
]
)
 df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

so I can generate a datetime:

 dt_parse = udf(lambda x: datetime.strptime(x,"%Y%m%d_%H%M%S")
 df_data = df_data.withColumn('datetime', dt_parse(df_data.date))

But now I need to group by intervals of 6 hours, per day. Per hour would be something on the lines of

 df_data.groupby(hour(df_data.datetime)).agg(count(ship).alias(ship)).show()

But this wouldn't work for other intervals than hour. Is there a way to do it?

Upvotes: 6

Views: 4183

Answers (1)

Serge
Serge

Reputation: 2006

This works for me.

import pyspark.sql.functions

# ...

interval = 60 * 60 * 6    # 6 hours
gdf = dataframe.withColumn(
    'time_interval',
    pyspark.sql.functions.from_unixtime(pyspark.sql.functions.floor(pyspark.sql.functions.unix_timestamp(dataframe[obj['field']]) / interval) * interval)
).groupBy('time_interval')
# and then something like gdf.agg(...); gdf.collect()

Upvotes: 3

Related Questions