Reputation: 874
I am converting my code to pyspark. Please find the example below and what is the alternative to convert to pyspark ? I couldnt see shift in pyspark. The main purpose of the below code is it checks the next row and cumulatively sum the value when it is not equal.
example['date_next'] = example.groupby("A")['date'].shift(-1).reset_index()
s = example[['A',
'B',
'C',
'D']].ne(example[['A','B','C','D']].shift(1)).any(axis=1).cumsum()
Thank you in advance
Upvotes: 1
Views: 113
Reputation: 26676
Not as straight forward. Using arrays would simplify. See logic and code below
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'A', 50, 30, 40, 78, 65, None),
(1, 'B', 56, 30, 30, 25, 67, 75),
(1, 'C', 56, 30, 30, 25, 67, 75)],
['Id', 'Type', 'A', 'B', 'C', 'D', 'E', 'F'])
df.show()
w =Window.partitionBy().orderBy('id')
(df.withColumn('ABCD', array('A','B','C','D'))#Combine the columns into an array
.withColumn('ABCDShift',lead('ABCD').over(w))#Shiftrows
.withColumn('cumsum', when(expr("forall(transform(ABCD, (c,i)-> c==ABCDShift[i]),c->c==true)")=='true',0).otherwise(1))#Check if array elements at each index agree totally. If all agree,o, otherwise 1o, if there
.withColumn('cumsum',sum('cumsum').over(w.rowsBetween(Window.unboundedPreceding,0)))#Cumsum
).drop('ABCDShift','ABCD').show(truncate=False)
|Id |Type|A |B |C |D |E |F |cumsum|
+---+----+---+---+---+---+---+----+------+
|1 |A |50 |30 |40 |78 |65 |null|1 |
|1 |B |56 |30 |30 |25 |67 |75 |1 |
|1 |C |56 |30 |30 |25 |67 |75 |2 |
+---+----+---+---+---+---+---+----+------+
Upvotes: 1