Reputation: 97
I have a dataframe with multiple products for each date by customer. In a new column I am trying to get previous unique date by customer.
Cst Prod Dt Desired Output
C1 P1 1-Jan-16 0
C1 P2 1-Jan-16 0
C1 P3 1-Jan-16 0
C1 P4 1-Jan-16 0
C1 P1 20-Jan-16 1-Jan-16
C1 P2 20-Jan-16 1-Jan-16
C2 P2 5-Feb-17 0
C2 P3 5-Feb-17 0
C2 P4 5-Feb-17 0
C2 P1 30-Mar-17 5-Feb-17
I am just starting with PySpark. So far, I tried creating an array column of dates (CUM_DATE) for each customer and then applying UDF to get all dates except one in the row and then take max of array column.
Something on the lines of -
def filter_currdate(arr, dt):
return [x for x in arr if x not in dt]
filter_currdate_udf = F.udf(lambda x: filter_code(x), ArrayType(DateType()))
df = df.withColumn('except_date', filter_currdate_udf(df['CUM_DATE'], df['Dt']))
df = df.withColumn('max_prev_date',F.max(df['except_date']))
But it is running into error and I am unable to figure out a better way to get this output.
Upvotes: 0
Views: 88
Reputation: 6385
There is other way without custom UDF functions. Let say df
has columns cst
, prod
, dt
:
from pyspark.sql.functions import max
df.alias('df1').join(df.alias('df2'),
(
col('df1.cst')==col('df2.cst')
& col('df1.prod') == col('df2.prod')
& col('df1.dt') > col('df2.dt'),
how='left_outer'
).select('df1.*', 'df2.dt')
.groupBy('df1.cst', 'df1.prod', 'df1.dt')
.agg(max('df2.dt'))
Upvotes: 1