M. Mate
M. Mate

Reputation: 47

Pyspark: re-sampling frequencies down to milliseconds

I need to join two spark dataframes on a timestamp column. The problem is that they have different frequencies: the first dataframe (df1) has an observation every 10 minutes, while the second one (df2) is 25 hz (25 observations every sec, which is 15000 times more frequent than df1). Each dataframe has over 100 columns, and millions of rows. To make a smooth join, I am trying to resample df1 down to 25 hz, front fill the Null values caused by resampling, and then join the dataframes once they are at the same frequency. The dataframes are too big, which is why I'm trying to use spark instead of pandas.

So, here is the question: let's say, I have the following spark dataframe:

sample_df

I want to resample it down to 25 hz (25 observations per sec), so that it would look like this:

Expected_result

How to do that efficiently in pyspark?

Note:

I tried to resample my df1 using the code from an earlier question (PySpark: how to resample frequencies) as below:

from pyspark.sql.functions import col, max as max_, min as min_

freq = x   # x is the frequency in seconds

epoch = (col("timestamp").cast("bigint") / freq).cast("bigint") * freq 

with_epoch  = df1.withColumn("dummy", epoch)

min_epoch, max_epoch = with_epoch.select(min_("dummy"), max_("dummy")).first()

new_df = spark.range(min_epoch, max_epoch + 1, freq).toDF("dummy")

new_df.join(with_epoch, "dummy", "left").orderBy("dummy")
.withColumn("timestamp_resampled", col("dummy").cast("timestamp"))

It seems, the above code only works when the intended frequency is more than or equal to a sec. For example, when freq = 1, it produces the following table:

undesired_result

However, when I pass 25 hz as the frequency (i.e. freq = 1/25) the code fails, because the 'step' in the spark.range function can not be less than 1.

Is there a workaround to solve this issue? Or any other way to re-sample the frequency down to milliseconds?

Upvotes: 1

Views: 1459

Answers (1)

Daniel Argüelles
Daniel Argüelles

Reputation: 2347

If your objective is to join 2 dataframes, I'd suggest to use an inner join directly:

df = df1.join(df2, df1.Timestamp == df2.Timestamp)

However, if you want to try to downsample the dataframe, you can convert timestamp to miliseconds an keep those rows that mod(timestamp, 25) == 0. You can use this only if you are sure that data is sampled perfectly.

from pyspark.sql.functions import col
df1 = df1.filter( ((col("Timestamp") % 25) == 0 )

Other option is to number each row and keep 1 every 25. With this solution, you are going to reduce rows without considering the timestamp. Another problem of this solution is that you need to sort data (not efficient).

PD: Premature optimization is the root of all evil

Edit: Timestamp to int

Let's create a fake dataset full of timestamps using epoch standard with miliseconds.

>>>  df = sqlContext.range(1559646513000, 1559646520000)\
                    .select( (F.col('id')/1000).cast('timestamp').alias('timestamp'))
>>> df
DataFrame[timestamp: timestamp]
>>> df.show(5,False)
+-----------------------+
|timestamp              |
+-----------------------+
|2019-06-04 13:08:33    |
|2019-06-04 13:08:33.001|
|2019-06-04 13:08:33.002|
|2019-06-04 13:08:33.003|
|2019-06-04 13:08:33.004|
+-----------------------+
only showing top 5 rows

Now, convert back to integers:

>>> df.select( (df.timestamp.cast('double')*1000).cast('bigint').alias('epoch') )\
      .show(5, False)
+-------------+
|epoch        |
+-------------+
|1559646513000|
|1559646513001|
|1559646513002|
|1559646513003|
|1559646513004|
+-------------+
only showing top 5 rows

Upvotes: 1

Related Questions