quantum
quantum

Reputation: 3

Filtering rows of a PySpark dataframe using multiple windows

I am trying to filter a Pyspark dataframe based on a list of tuples of timestamps [(start1, stop1), (start2, stop2), ...]. Each tuple represents a time window. The Pyspark dataframe as the following structure:

+-------------------+------+
|                 ts|   var|
+-------------------+------+
|2018-09-01 20:10:00|     0|
|2018-09-01 20:12:00|     2|
|2018-09-01 20:13:00|     1|
|2018-09-01 20:17:00|     5|
+-------------------+------+

ts is a column of timestamps and var is a column of a variable of interest. I am looking for an efficient method to filter out all rows which are not within one of the time windows. For example if my list of time windows consits of a single window [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))] the filtered dataframe should be

+-------------------+------+ 
|                 ts|   var| 
+-------------------+------+ 
|2018-09-01 20:12:00|     2| 
|2018-09-01 20:13:00|     1|
+-------------------+------+ 

I was able to come up with a working code snippet using a udf and a for-loop which iterates for each row over all time windows (see code below). However looping for each row over all time windows is slow.

Some additional information:

If someone could point out a more efficient solution, I would greatly appreciate it.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
from datetime import datetime

spark = SparkSession.builder.getOrCreate()

# create Pyspark dataframe
data = {'ts': [datetime(2018, 9, 1, 20, 10), datetime(2018, 9, 1, 20, 12),
               datetime(2018, 9, 1, 20, 13), datetime(2018, 9, 1, 20, 17)],
         'var': [0, 2, 1, 5]}
df = spark.createDataFrame(pd.DataFrame(data))

# list of windows [(start1, stop1), (start2, stop2), ...] for filtering
windows = [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]

# udf for filtering
def is_in_windows_udf(windows):
    def _is_in_windows(t, windows):
        for ts_l, ts_h in windows:
            if ts_l <= t <= ts_h:
                return True
            return False
    return udf(lambda t: _is_in_windows(t, windows), BooleanType())

# perform actual filtering operation
df.where(is_in_windows_udf(windows)(col("ts"))).show()

Upvotes: 0

Views: 2451

Answers (1)

Ankit Kumar Namdeo
Ankit Kumar Namdeo

Reputation: 1464

A simpler solution could be the below one and because we are doing union over the same data set so it will parallelize the execution as well:

for count, item in enumerate(windows):
    if count == 0:
        result = df.filter(
            (F.col("ts")<= item[1]) &
            (F.col("ts")>= item[0])
        )
    else:
        result = result.union(
            df.filter(
            (F.col("ts")<= item[1]) &
            (F.col("ts")>= item[0])
            )
        )

Upvotes: 1

Related Questions