Reputation: 3440
I have a Spark dataframe, like so:
# For sake of simplicity only one id is shown, but there are multiple objects
+---+-------------------+------+
| id| timstm|signal|
+---+-------------------+------+
| X1|2022-07-01 00:00:00| null|
| X1|2022-07-02 00:00:00| true|
| X1|2022-07-03 00:00:00| null|
| X1|2022-07-05 00:00:00| null|
| X1|2022-07-09 00:00:00| true|
+---+-------------------+------+
And I want to create a new column that contains the time since the signal
column was last true
+---+-------------------+------+---------+
| id| timstm|signal|time_diff|
+---+-------------------+------+---------+
| X1|2022-07-01 00:00:00| null| null|
| X1|2022-07-02 00:00:00| true| 0.0|
| X1|2022-07-03 00:00:00| null| 1.0|
| X1|2022-07-05 00:00:00| null| 3.0|
| X1|2022-07-09 00:00:00| true| 0.0|
+---+-------------------+------+---------+
Any ideas how to approach this? My intuition is to somehow use window
and filter
to achieve this, but I'm not sure
Upvotes: 0
Views: 90
Reputation: 351
You are right, all you need is a Window function and some "filtering", no fillna
or Pandas API needed.
The idea is: in each row, pick over all earlier and this row's timestamps the largest timestamp where signal
is True
:
from pyspark.sql.window import Window
# your example Dataframe for completeness
df = spark.createDataFrame([('X1', '2022-07-01 00:00:00', None),
('X1', '2022-07-02 00:00:00', None),
('X1', '2022-07-03 00:00:00', True),
('X1', '2022-07-05 00:00:00', None),
('X1', '2022-07-09 00:00:00', True)],
"id string, timstm string, signal boolean")
# earlier rows and current row per id, and according to ordered column "timstm"
all_earlier_and_current_rows = \
Window.partitionBy(col("id")) \
.orderBy(col("timstm")) \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("timstmWhenTrue", when(col("signal"), col("timstm"))) \
.withColumn("timstmLastTrue", max(col("timstmWhenTrue")).over(all_earlier_and_current_rows)) \
.show()
The result looks as expected:
+---+-------------------+------+-------------------+-------------------+
| id| timstm|signal| timstmWhenTrue| timstmLastTrue|
+---+-------------------+------+-------------------+-------------------+
| X1|2022-07-01 00:00:00| null| null| null|
| X1|2022-07-02 00:00:00| null| null| null|
| X1|2022-07-03 00:00:00| true|2022-07-03 00:00:00|2022-07-03 00:00:00|
| X1|2022-07-05 00:00:00| null| null|2022-07-03 00:00:00|
| X1|2022-07-09 00:00:00| true|2022-07-09 00:00:00|2022-07-09 00:00:00|
+---+-------------------+------+-------------------+-------------------+
You can finally .drop("timstmWhenTrue")
, as that column is not needed any more.
Upvotes: 0
Reputation: 687
So this logic is a bit hard to express in native PySpark. It might be easier to express it as a pandas_udf. I will use the Fugue library to bring Python/Pandas code to a Pandas UDF, but if you don't want to use Fugue, you can still bring it to Pandas UDF, it just takes a lot more code.
Setup
Here I am just creating the DataFrame in the example. I know this is a Pandas DataFrame, we will convert it to Spark and run the solution on Spark later.
I suggest filling the null with False in the original DataFrame. This is because the Pandas code uses a group-by and NULL values are dropped by default in Pandas groupby. Filling the NULL with False will make it work properly (and I think it's also easier for conversion between Spark and Pandas).
import pandas as pd
df = pd.DataFrame({"id": ["X1"]*5,
"timestm": ["2022-07-01", "2022-07-02", "2022-07-03", "2022-07-05", "2022-07-09"],
"signal": [None, True, None, None, True]})
df['timestm'] = pd.to_datetime(df['timestm'])
df['signal'] = df['signal'].fillna(False)
Solution 1
So when we use Pandas-UDF, the important piece is that the function is applied per Spark partition. So the function just needs to be able to handle one id. And then we partition the Spark DataFrame by id and run the function for each one later.
Also be aware that order may not be guaranteed so we'll sort the data by time as the first step. The Pandas code I have is really just taken from another post here and modified.
def process(df: pd.DataFrame) -> pd.DataFrame:
df = df.sort_values('timestm')
df['days_since_last_event'] = df['timestm'].diff().apply(lambda x: x.days)
df.loc[:, 'days_since_last_event'] = df.groupby(df['signal'].shift().cumsum())['days_since_last_event'].cumsum()
df.loc[df['signal'] == True, 'days_since_last_event'] = 0
return df
process(df)
This will give us:
id timestm signal days_since_last_event
X1 2022-07-01 False NaN
X1 2022-07-02 True 0.0
X1 2022-07-03 False 1.0
X1 2022-07-05 False 3.0
X1 2022-07-09 True 0.0
Which looks right. Now we can bring it to Spark using Fugue with minimal additional lines of code. This will partition the data and run the function on each partition. Schema is a requirement for Pandas UDF so Fugue needs it also, but uses a simpler way to define it.
import fugue.api as fa
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df)
out = fa.transform(sdf, process, schema="*, days_since_last_event:int", partition={"by": "id"})
# out is a Spark DataFrame because a Spark DataFrame was passed in
out.show()
which gives us:
+---+-------------------+------+---------------------+
| id| timestm|signal|days_since_last_event|
+---+-------------------+------+---------------------+
| X1|2022-07-01 00:00:00| false| null|
| X1|2022-07-02 00:00:00| true| 0|
| X1|2022-07-03 00:00:00| false| 1|
| X1|2022-07-05 00:00:00| false| 3|
| X1|2022-07-09 00:00:00| true| 0|
+---+-------------------+------+---------------------+
Note to define the partition when running on the full data.
Upvotes: 1