VSP
VSP

Reputation: 379

How to create new column in pyspark where the conditional depends on the subsequent values of a column?

I have the following pyspark dataframe. As shown in the picture below I need to create column Value2 from column Value1. The issue is that for all the rows with the same value of time, the value in column Value2 needs to be the first value corresponding to this time in Value1. Therefore, if you look at the picture for all the rows for which the time is 16:07 the value needs to be 100. I know I need to use some form of conditional but I could not work out how to do it so I was wondering if someone could point me in the right direction.

data={
    ‘Name’:[‘John’,’Kate’,’William’,’Andrew’,’Nicole’,’Nicola’,’Gavin’,’Gabriel’, ‘John’,’Kate’,’William’,’Andrew’,’Nicole’,’Nicola’,’Gavin’,’Gabriel’],
    ’Value1’:[10,20,50,60,100,200,250,300,450,520,570,600,630,670,690,700,720],
    ’Time’:[‘ 15/06/2020  16:05:00’, ‘ 15/06/2020  16:05:00’, ‘ 15/06/2020  16:05:00’, ‘ 15/06/2020  16:06:00’, ‘ 15/06/2020  16:07:00’, ‘ 15/06/2020  16:07:00’, ‘ 15/06/2020  16:08:00’, ‘ 15/06/2020  16:09:00’, ‘ 15/06/2020  16:10:00’, ‘ 15/06/2020  17:20:00’, ‘ 15/06/2020  17:21:00’, ‘ 15/06/2020  17:22:00’, ‘ 15/06/2020  17:22:00’, ‘ 15/06/2020  17:22:00’, ‘ 15/06/2020  17:22:00’, , ‘ 15/06/2020  17:25:00’, , ‘ 15/06/2020  17:26:00’}

df=pd.DataFrame(data)
df_spark=spark.createDataFrame(df)

enter image description here

Upvotes: 0

Views: 880

Answers (2)

Arjoon
Arjoon

Reputation: 200

You have 2 options:

  1. Use a window function to extract the first value/minimum value in Value 1. Note: If this takes too long, use option 2. Window functions are not performant in general and should be avoided on large dataframes.
  1. The second option is create an aggregate table and join that back into your dataframe. This is the more performant method if your dataset is large but requires 2 steps instead. The query for the aggregate table could look something like this: spark.sql("SELECT Name, Time, Min(Value1) FROM Table GROUP BY Name, Time").createOrReplaceTempView("Aggregate_Table"). And the query for your final table would be: spark.sql("SELECT a.*, b.Time AS Value2 FROM Table a INNER JOIN Aggregate_Table b ON a.Name = b.Name AND a.Time = b.Time")

Upvotes: 1

Nico Arbar
Nico Arbar

Reputation: 162

Try the window functions. You get the min value1 in the window defined by the column 'time':

    from pyspark.sql import Window
    window = Window.partitionBy('Time')
    df_spark.withColumn('Value2', min('Value1').over(window))

Upvotes: 1

Related Questions