Reputation: 3
I am trying to filter a Pyspark dataframe based on a list of tuples of timestamps [(start1, stop1), (start2, stop2), ...]
. Each tuple represents a time window. The Pyspark dataframe as the following structure:
+-------------------+------+
| ts| var|
+-------------------+------+
|2018-09-01 20:10:00| 0|
|2018-09-01 20:12:00| 2|
|2018-09-01 20:13:00| 1|
|2018-09-01 20:17:00| 5|
+-------------------+------+
ts is a column of timestamps and var is a column of a variable of interest. I am looking for an efficient method to filter out all rows which are not within one of the time windows. For example if my list of time windows consits of a single window [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]
the filtered dataframe should be
+-------------------+------+
| ts| var|
+-------------------+------+
|2018-09-01 20:12:00| 2|
|2018-09-01 20:13:00| 1|
+-------------------+------+
I was able to come up with a working code snippet using a udf and a for-loop which iterates for each row over all time windows (see code below). However looping for each row over all time windows is slow.
Some additional information:
If someone could point out a more efficient solution, I would greatly appreciate it.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
from datetime import datetime
spark = SparkSession.builder.getOrCreate()
# create Pyspark dataframe
data = {'ts': [datetime(2018, 9, 1, 20, 10), datetime(2018, 9, 1, 20, 12),
datetime(2018, 9, 1, 20, 13), datetime(2018, 9, 1, 20, 17)],
'var': [0, 2, 1, 5]}
df = spark.createDataFrame(pd.DataFrame(data))
# list of windows [(start1, stop1), (start2, stop2), ...] for filtering
windows = [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]
# udf for filtering
def is_in_windows_udf(windows):
def _is_in_windows(t, windows):
for ts_l, ts_h in windows:
if ts_l <= t <= ts_h:
return True
return False
return udf(lambda t: _is_in_windows(t, windows), BooleanType())
# perform actual filtering operation
df.where(is_in_windows_udf(windows)(col("ts"))).show()
Upvotes: 0
Views: 2451
Reputation: 1464
A simpler solution could be the below one and because we are doing union over the same data set so it will parallelize the execution as well:
for count, item in enumerate(windows):
if count == 0:
result = df.filter(
(F.col("ts")<= item[1]) &
(F.col("ts")>= item[0])
)
else:
result = result.union(
df.filter(
(F.col("ts")<= item[1]) &
(F.col("ts")>= item[0])
)
)
Upvotes: 1