Reputation: 47
I need to join two spark dataframes on a timestamp column. The problem is that they have different frequencies: the first dataframe (df1) has an observation every 10 minutes, while the second one (df2) is 25 hz (25 observations every sec, which is 15000 times more frequent than df1). Each dataframe has over 100 columns, and millions of rows. To make a smooth join, I am trying to resample df1 down to 25 hz, front fill the Null values caused by resampling, and then join the dataframes once they are at the same frequency. The dataframes are too big, which is why I'm trying to use spark instead of pandas.
So, here is the question: let's say, I have the following spark dataframe:
I want to resample it down to 25 hz (25 observations per sec), so that it would look like this:
How to do that efficiently in pyspark?
Note:
I tried to resample my df1 using the code from an earlier question (PySpark: how to resample frequencies) as below:
from pyspark.sql.functions import col, max as max_, min as min_
freq = x # x is the frequency in seconds
epoch = (col("timestamp").cast("bigint") / freq).cast("bigint") * freq
with_epoch = df1.withColumn("dummy", epoch)
min_epoch, max_epoch = with_epoch.select(min_("dummy"), max_("dummy")).first()
new_df = spark.range(min_epoch, max_epoch + 1, freq).toDF("dummy")
new_df.join(with_epoch, "dummy", "left").orderBy("dummy")
.withColumn("timestamp_resampled", col("dummy").cast("timestamp"))
It seems, the above code only works when the intended frequency is more than or equal to a sec. For example, when freq = 1, it produces the following table:
However, when I pass 25 hz as the frequency (i.e. freq = 1/25) the code fails, because the 'step' in the spark.range function can not be less than 1.
Is there a workaround to solve this issue? Or any other way to re-sample the frequency down to milliseconds?
Upvotes: 1
Views: 1459
Reputation: 2347
If your objective is to join 2 dataframes, I'd suggest to use an inner join directly:
df = df1.join(df2, df1.Timestamp == df2.Timestamp)
However, if you want to try to downsample the dataframe, you can convert timestamp to miliseconds an keep those rows that mod(timestamp, 25) == 0
. You can use this only if you are sure that data is sampled perfectly.
from pyspark.sql.functions import col
df1 = df1.filter( ((col("Timestamp") % 25) == 0 )
Other option is to number each row and keep 1 every 25. With this solution, you are going to reduce rows without considering the timestamp. Another problem of this solution is that you need to sort data (not efficient).
PD: Premature optimization is the root of all evil
Let's create a fake dataset full of timestamps using epoch standard with miliseconds.
>>> df = sqlContext.range(1559646513000, 1559646520000)\
.select( (F.col('id')/1000).cast('timestamp').alias('timestamp'))
>>> df
DataFrame[timestamp: timestamp]
>>> df.show(5,False)
+-----------------------+
|timestamp |
+-----------------------+
|2019-06-04 13:08:33 |
|2019-06-04 13:08:33.001|
|2019-06-04 13:08:33.002|
|2019-06-04 13:08:33.003|
|2019-06-04 13:08:33.004|
+-----------------------+
only showing top 5 rows
Now, convert back to integers:
>>> df.select( (df.timestamp.cast('double')*1000).cast('bigint').alias('epoch') )\
.show(5, False)
+-------------+
|epoch |
+-------------+
|1559646513000|
|1559646513001|
|1559646513002|
|1559646513003|
|1559646513004|
+-------------+
only showing top 5 rows
Upvotes: 1