Reputation: 25367
I am having a data set in the following format:
import numpy as np
import pandas as pd
# Create the data set
np.random.seed(42)
records = list()
for i in range(2):
for j in range(2):
for k in range(500):
t = np.random.randint(pd.Timestamp('2000-01-01').value, pd.Timestamp('2018-01-01').value)
if np.random.rand() > .95: continue
ts = pd.Timestamp(t).strftime('%Y-%m-%d %H:%M:%S.%f')
records.append( (i, j, np.random.rand(), ts) )
df = pd.DataFrame.from_records(records)
df.columns =['a_id', 'b_id', 'value', 'time']
Which looks like this:
a_id b_id value time
0 0 0 0.156019 2007-09-28 15:12:24.260596
1 0 0 0.601115 2015-09-08 01:59:18.043399
2 0 0 0.969910 2012-01-10 07:51:29.662492
3 0 0 0.181825 2011-08-28 19:58:33.281289
4 0 0 0.524756 2015-11-15 14:18:17.398715
5 0 0 0.611853 2015-01-07 23:44:37.034322
6 0 0 0.366362 2008-06-21 11:56:10.529679
7 0 0 0.199674 2010-11-08 18:24:18.794838
8 0 0 0.046450 2008-04-27 02:36:46.026876
Here a_id
and b_id
are the key for a sensor. This means the data frame has to be transformed as such:
df_ = pd.pivot_table(df, index='time', columns=['a_id', 'b_id'], values='value')
df_.index = [pd.to_datetime(v) for v in df_.index]
df_ = df_.resample('1W').mean().ffill().bfill()
After resampling and filling the gaps, the data is in the desired format:
a_id 0 1
b_id 0 1 0 1
2000-01-09 0.565028 0.560434 0.920740 0.458825
2000-01-16 0.565028 0.146963 0.920740 0.217588
2000-01-23 0.565028 0.840872 0.920740 0.209690
2000-01-30 0.565028 0.046852 0.920740 0.209690
2000-02-06 0.565028 0.046852 0.704871 0.209690
Each column contains now the data of a sensor.
The problem is, I do not know how to do this in PySpark.
df_test = spark.createDataFrame(df) \
.withColumn('time', F.to_utc_timestamp('time', '%Y-%m-%d %H:%M:%S.%f'))
df_test.printSchema()
Having
root
|-- a_id: long (nullable = true)
|-- b_id: long (nullable = true)
|-- value: double (nullable = true)
|-- time: timestamp (nullable = true)
How can I transform df_test
such that it has the same form as df_
?
Upvotes: 2
Views: 5640
Reputation: 236
You can "resample" your data using pyspark.ml.feature.Bucketizer
# Truncate timestamp to day precision, and convert to unixtime
df = df.withColumn("tt",
F.unix_timestamp(F.date_trunc("day", "time")))
df.show(5)
# +----+----+--------------------+--------------------+---+----------+
# |a_id|b_id| value| time| id| tt|
# +----+----+--------------------+--------------------+---+----------+
# | 0| 0| 0.15601864044243652|2007-09-28 15:12:...| 00|1190962800|
# | 0| 0| 0.6011150117432088|2015-09-08 01:59:...| 00|1441695600|
# | 0| 0| 0.9699098521619943|2012-01-10 07:51:...| 00|1326182400|
# | 0| 0| 0.18182496720710062|2011-08-28 19:58:...| 00|1314514800|
# | 0| 0| 0.5247564316322378|2015-11-15 14:18:...| 00|1447574400|
# +----+----+--------------------+--------------------+---+----------+
# Get the minimum and maximum dates
tmin = df.select(F.min("tt")).collect()[0][0]
tmax = df.select(F.max("tt")).collect()[0][0]
# Get the number of seconds in a week
week = 60 * 60 * 24 * 7
# Get a list of bucket splits (add infinity for last split if weeks don't evenly divide)
splits = list(range(tmin, tmax, week)) + [float("inf")]
# Create bucket and bucket your data
bucketizer = Bucketizer(inputCol="tt", outputCol="num_week", splits=splits)
bucketed_df = Bucketizer.transform(df)
bucketed_df.show(5)
# +----+----+-------------------+--------------------+---+----------+---------+
# |a_id|b_id| value| time| id| tt|num_weeks|
# +----+----+-------------------+--------------------+---+----------+---------+
# | 0| 0|0.15601864044243652|2007-09-28 15:12:...| 00|1190962800|403.0 |
# | 0| 0| 0.6011150117432088|2015-09-08 01:59:...| 00|1441695600|818.0 |
# | 0| 0| 0.9699098521619943|2012-01-10 07:51:...| 00|1326182400|627.0 |
# | 0| 0|0.18182496720710062|2011-08-28 19:58:...| 00|1314514800|607.0 |
# | 0| 0| 0.5247564316322378|2015-11-15 14:18:...| 00|1447574400|827.0 |
# +----+----+-------------------+--------------------+---+----------+---------+
# Convert the buckets to a timestamp (seconds in week * bucket value + min_date)
bucketed_df = bucketed_df.withColumn(
"time",
F.from_unixtime(col("weeks") * week + tmin).cast("date")))
bucketed_df.show(5)
# +----+----+-------------------+----------+---+----------+-----+
# |a_id|b_id| value| time| id| tt|weeks|
# +----+----+-------------------+----------+---+----------+-----+
# | 0| 0|0.15601864044243652|2007-09-24| 00|1190962800|403.0|
# | 0| 0| 0.6011150117432088|2015-09-07| 00|1441695600|818.0|
# | 0| 0| 0.9699098521619943|2012-01-09| 00|1326182400|627.0|
# | 0| 0|0.18182496720710062|2011-08-22| 00|1314514800|607.0|
# | 0| 0| 0.5247564316322378|2015-11-09| 00|1447574400|827.0|
# +----+----+-------------------+----------+---+----------+-----+
# Finally, do the groupBy and pivot as already explained
# (I already concatenated "a_id" and "b_id" into the column "id"
final_df = bucketed_df.groupBy("time").pivot("id").agg(F.avg("value"))
final_df.show(10)
# +----------+--------------------+--------------------+-------------------+-------------------+
# | time| 00| 01| 10| 11|
# +----------+--------------------+--------------------+-------------------+-------------------+
# |2015-03-09|0.045227288910538066| 0.8633336495718252| 0.8229838050417675| null|
# |2000-07-03| null| null| 0.7855315583735368| null|
# |2013-09-09| 0.6334037565104235| null|0.14284196481433187| null|
# |2005-06-06| null| 0.9095933818175037| null| null|
# |2017-09-11| null| 0.9684887775943838| null| null|
# |2004-02-23| null| 0.3782888656818202| null|0.26674411859262276|
# |2004-07-12| null| null| 0.2528581182501112| 0.4189697737795244|
# |2000-12-25| null| null| 0.5473347601436167| null|
# |2016-04-25| null| 0.9918099513493635| null| null|
# |2016-10-03| null|0.057844449447160606| 0.2770125243259788| null|
# +----------+--------------------+--------------------+-------------------+-------------------+
That will get you to almost what you need. Unfortunately implementing the pandas.DataFrame.ffill() and pandas.DataFrame.bfill() methods is not nearly as easy in pyspark as it is in pandas due to distributed nature of the data. See here and here for suggestions.
Upvotes: 0
Reputation: 40360
As mentionned in the comment, here is a solution to pivot your data :
You should concat your columns a_id
and b_id
under a new column c_id
and group by date
then pivot on c_id
and use values how to see fit.
As for resampling, I'd point you to the solution provided by @zero323 here.
Upvotes: 1