Reputation: 4512
Considering the below dataset in Spark, I would like to resample the dates with a specific frequency (eg. 5 minutes).
START_DATE = dt.datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
'user_id': [15,15,16,16,15,17,17,17,16,17],
'status': [0,1,1,1,0,1,0,1,1,0],
'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})
test_df.groupby(['school_id', 'class_id', 'user_id', 'start']).min()
However I also want the resample to happen between two specific date ranges: 2019-08-15 20:30:00
and 2019-08-15 21:00:00
. So every group of school_id
, class_id
and user_id
will have 6 entries, one every 5 min bucket between the two date ranges.
The null
entries generated by the resample should be populated by forward-fill.
I've used Pandas for the sample dataset, but the actual dataframe will be pulled in Spark, so the approach I'm looking for should be done in Spark as well.
I guess the approach might be similar to this one PySpark: how to resample frequencies but I'm not getting it to work in this scenario.
Thanks for your help
Upvotes: 3
Views: 1619
Reputation: 1669
This probably is not the best way to get the final results, but just want to show the idea here.
from datetime import datetime
import pytz
from pytz import timezone
# Create DataFrame
START_DATE = datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
'user_id': [15,15,16,16,15,17,17,17,16,17],
'status': [0,1,1,1,0,1,0,1,1,0],
'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})
# Convert TimeStamp to Integers
df = spark.createDataFrame(test_df)
print(df.dtypes)
df = df.withColumn('start', F.col('start').cast("bigint"))
df.show()
This outputs:
+---------+--------+-------+------+----------+
|school_id|class_id|user_id|status| start|
+---------+--------+-------+------+----------+
| remote| green| 15| 0|1565915580|
| remote| green| 15| 1|1565915700|
| remote| red| 16| 1|1565915820|
| remote| red| 16| 1|1565915940|
| onsite| green| 15| 0|1565916060|
| onsite| green| 17| 1|1565916180|
| onsite| green| 17| 0|1565916300|
| onsite| green| 17| 1|1565916420|
| remote| red| 16| 1|1565916540|
| remote| green| 17| 0|1565916660|
+---------+--------+-------+------+----------+
# Create time sequece needed
start = datetime.strptime('2019-08-15 20:30:00', '%Y-%m-%d %H:%M:%S')
eastern = timezone('US/Eastern')
start = eastern.localize(start)
times = pd.date_range(start = start, periods = 6, freq='5min')
times = [s.timestamp() for s in times]
print(times)
[1565915400.0, 1565915700.0, 1565916000.0, 1565916300.0, 1565916600.0, 1565916900.0]
# Use pandas_udf to create final DataFrame
schm = StructType(df.schema.fields + [StructField('epoch', IntegerType(), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def resample(pdf):
pddf = pd.DataFrame({'epoch':times})
pddf['school_id'] = pdf['school_id'][0]
pddf['class_id'] = pdf['class_id'][0]
pddf['user_id'] = pdf['user_id'][0]
res = np.searchsorted(times, pdf['start'])
arr = np.zeros(len(times))
arr[:] = np.nan
arr[res] = pdf['start']
pddf['status'] = arr
arr[:] = np.nan
arr[res] = pdf['status']
pddf['start'] = arr
return pddf
df = df.groupBy('school_id', 'class_id', 'user_id').apply(resample)
df = df.withColumn('timestamp', F.to_timestamp(df['epoch']))
df.show(60)
The Final results:
+---------+--------+-------+----------+-----+----------+-------------------+
|school_id|class_id|user_id| status|start| epoch| timestamp|
+---------+--------+-------+----------+-----+----------+-------------------+
| remote| red| 16| null| null|1565915400|2019-08-15 20:30:00|
| remote| red| 16| null| null|1565915700|2019-08-15 20:35:00|
| remote| red| 16|1565915940| 1|1565916000|2019-08-15 20:40:00|
| remote| red| 16| null| null|1565916300|2019-08-15 20:45:00|
| remote| red| 16|1565916540| 1|1565916600|2019-08-15 20:50:00|
| remote| red| 16| null| null|1565916900|2019-08-15 20:55:00|
| onsite| green| 15| null| null|1565915400|2019-08-15 20:30:00|
| onsite| green| 15| null| null|1565915700|2019-08-15 20:35:00|
| onsite| green| 15| null| null|1565916000|2019-08-15 20:40:00|
| onsite| green| 15|1565916060| 0|1565916300|2019-08-15 20:45:00|
| onsite| green| 15| null| null|1565916600|2019-08-15 20:50:00|
| onsite| green| 15| null| null|1565916900|2019-08-15 20:55:00|
| remote| green| 17| null| null|1565915400|2019-08-15 20:30:00|
| remote| green| 17| null| null|1565915700|2019-08-15 20:35:00|
| remote| green| 17| null| null|1565916000|2019-08-15 20:40:00|
| remote| green| 17| null| null|1565916300|2019-08-15 20:45:00|
| remote| green| 17| null| null|1565916600|2019-08-15 20:50:00|
| remote| green| 17|1565916660| 0|1565916900|2019-08-15 20:55:00|
| onsite| green| 17| null| null|1565915400|2019-08-15 20:30:00|
| onsite| green| 17| null| null|1565915700|2019-08-15 20:35:00|
| onsite| green| 17| null| null|1565916000|2019-08-15 20:40:00|
| onsite| green| 17|1565916180| 1|1565916300|2019-08-15 20:45:00|
| onsite| green| 17|1565916420| 1|1565916600|2019-08-15 20:50:00|
| onsite| green| 17| null| null|1565916900|2019-08-15 20:55:00|
| remote| green| 15| null| null|1565915400|2019-08-15 20:30:00|
| remote| green| 15|1565915580| 0|1565915700|2019-08-15 20:35:00|
| remote| green| 15| null| null|1565916000|2019-08-15 20:40:00|
| remote| green| 15| null| null|1565916300|2019-08-15 20:45:00|
| remote| green| 15| null| null|1565916600|2019-08-15 20:50:00|
| remote| green| 15| null| null|1565916900|2019-08-15 20:55:00|
+---------+--------+-------+----------+-----+----------+-------------------+
Now you get 6 timestamps for each group.
Note, not all the original 'status' and 'start' are mapped to the final DataFrame, that's because in the resample
udf, it happens for 5minute
interval, two 'start' times can map to the same time grid point, you lose one here. This can be tuned in the udf
based on your frequency and how you want to keep the data.
Upvotes: 2