OdiumPura
OdiumPura

Reputation: 631

Look back based on X days an get col values based on condition spark

I have the following DF:

--------------------------------
|Id |Date       |Value   |cond  |
|-------------------------------|
|1  |2022-08-03 |     100| 1    |                         
|1  |2022-08-04 |     200| 2    |
|1  |2022-08-05 |     150| 3    |
|1  |2022-08-06 |     300| 4    |
|1  |2022-08-07 |     400| 5    |
|1  |2022-08-08 |     150| 6    |
|1  |2022-08-09 |     500| 7    |
|1  |2022-08-10 |     150| 8    |
|1  |2022-08-11 |     150| 9    |
|1  |2022-08-12 |     700| 1    |
|1  |2022-08-13 |     800| 2    |
|1  |2022-08-14 |     150| 2    |
|1  |2022-08-15 |     300| 0    |
|1  |2022-08-16 |     200| 1    |
|1  |2022-08-17 |     150| 3    |
|1  |2022-08-18 |     150| 1    |
|1  |2022-08-19 |     250| 4    |
|1  |2022-08-20 |     150| 5    |
|1  |2022-08-21 |     400| 6    |    
|2  |2022-08-03 |     100| 1    |
|2  |2022-08-04 |     200| 2    |
|2  |2022-08-05 |     150| 1    |
|2  |2022-08-06 |     300| 1    |
|2  |2022-08-07 |     400| 1    |
|2  |2022-08-08 |     150| 1    |
|2  |2022-08-09 |     125| 1    |
|2  |2022-08-10 |     150| 1    |
|2  |2022-08-11 |     150| 3    |
|2  |2022-08-12 |     170| 6    |
|2  |2022-08-13 |     150| 7    |
|2  |2022-08-14 |     150| 8    |
|2  |2022-08-15 |     300| 1    |
|2  |2022-08-16 |     150| 9    |
|2  |2022-08-17 |     150| 0    |
|2  |2022-08-18 |     400| 1    |
|2  |2022-08-19 |     150| 1    |
|2  |2022-08-20 |     500| 1    |
|2  |2022-08-21 |     150| 1    |
--------------------------------

And this one:

---------------------
|Date       | cond  |
|-------------------|
|2022-08-03 |  1    |
|2022-08-04 |  2    |
|2022-08-05 |  1    |
|2022-08-06 |  1    |
|2022-08-07 |  1    |
|2022-08-08 |  1    |
|2022-08-09 |  1    |
|2022-08-10 |  1    |
|2022-08-11 |  3    |
|2022-08-12 |  6    |
|2022-08-13 |  8    |
|2022-08-14 |  9    |
|2022-08-15 |  1    |
|2022-08-16 |  2    |
|2022-08-17 |  2    |
|2022-08-18 |  0    |
|2022-08-19 |  1    |
|2022-08-20 |  3    |
|2022-08-21 |  1    |
--------------------

My expected output is:

-------------------------------
|Id |Date       |Avg   |Count|
|-----------------------------|
|1  |2022-08-03 |     0| 0    |                         
|1  |2022-08-04 |     0| 0    |
|1  |2022-08-05 |     0| 0    |
|1  |2022-08-06 |     0| 0    |
|1  |2022-08-07 |     0| 0    |
|1  |2022-08-08 |     0| 0    |
|1  |2022-08-09 |     0| 0    |
|1  |2022-08-10 |     0| 0    |
|1  |2022-08-11 |     0| 0    |
|1  |2022-08-12 |     0| 0    |
|1  |2022-08-13 |     0| 0    |
|1  |2022-08-14 |     0| 0    |
|1  |2022-08-15 |     0| 0    |
|1  |2022-08-16 |     0| 0    |
|1  |2022-08-17 |     0| 0    |
|1  |2022-08-18 |     0| 0    |
|1  |2022-08-19 |     0| 0    |
|1  |2022-08-20 |     0| 0    |
|1  |2022-08-21 |     0| 0    |    
|2  |2022-08-03 |     0| 0    |
|2  |2022-08-04 |     0| 0    |
|2  |2022-08-05 |     0| 1    |
|2  |2022-08-06 |     0| 2    |
|2  |2022-08-07 |     0| 3    |
|2  |2022-08-08 | 237,5| 4    |
|2  |2022-08-09 |   250| 4    |
|2  |2022-08-10 |243,75| 4    |
|2  |2022-08-11 |     0| 0    |
|2  |2022-08-12 |     0| 0    |
|2  |2022-08-13 |     0| 0    |
|2  |2022-08-14 |     0| 0    |
|2  |2022-08-15 |206,25| 4    |
|2  |2022-08-16 |     0| 0    |
|2  |2022-08-17 |     0| 0    |
|2  |2022-08-18 |     0| 0    |
|2  |2022-08-19 |243,75| 4    |
|2  |2022-08-20 |     0| 0    |
|2  |2022-08-21 | 337,5| 4    |
-------------------------------

The algorithm is:

  1. Verify if Date and Cond are the same in the first and second DFs.
  2. If the condition is true, I need to lookback on DF1 four days (D-1, D-2, D-3, D-4) based on Cond and calculate the Average(Avg) and count of this values. If I have more then 4 days I need to use the top 4 values to calculate the Avg and Count is going to be always 4 in this case.

Example situations based on the inputs:

  1. Id = 1, Date = 2022-08-08
  1. Id = 2, Date = 2022-08-08
  1. Id = 2, Date = 2022-08-07

I tried to use window function, but with no success. I was able to achieve the output DF using SQL (Joins with Outter Apply). But spark doesn't have outter apply. So, my doubts are:

  1. How to generate the output DF.
  2. What is the best the way the generate the output DF.

MVCE to generate the input DFs in pyspark:

data_1=[
    ("1","2022-08-03",100,1),
    ("1","2022-08-04",200,2),
    ("1","2022-08-05",150,3),
    ("1","2022-08-06",300,4),
    ("1","2022-08-07",400,5),
    ("1","2022-08-08",150,6),
    ("1","2022-08-09",500,7),
    ("1","2022-08-10",150,8),
    ("1","2022-08-11",150,9),
    ("1","2022-08-12",700,1),
    ("1","2022-08-13",800,2),
    ("1","2022-08-14",150,2),
    ("1","2022-08-15",300,0),
    ("1","2022-08-16",200,1),
    ("1","2022-08-17",150,3),
    ("1","2022-08-18",150,1),
    ("1","2022-08-19",250,4),
    ("1","2022-08-20",150,5),
    ("1","2022-08-21",400,6),
    ("2","2022-08-03",100,1),
    ("2","2022-08-04",200,2),
    ("2","2022-08-05",150,1),
    ("2","2022-08-06",300,1),
    ("2","2022-08-07",400,1),
    ("2","2022-08-08",150,1),
    ("2","2022-08-09",125,1),
    ("2","2022-08-10",150,1),
    ("2","2022-08-11",150,3),
    ("2","2022-08-12",170,6),
    ("2","2022-08-13",150,7),
    ("2","2022-08-14",150,8),
    ("2","2022-08-15",300,1),
    ("2","2022-08-16",150,9),
    ("2","2022-08-17",150,0),
    ("2","2022-08-18",400,1),
    ("2","2022-08-19",150,1),
    ("2","2022-08-20",500,1),
    ("2","2022-08-21",150,1)
    
]

schema_1 = StructType([
    StructField("Id", StringType(),True),
    StructField("Date", DateType(),True),
    StructField("Value", IntegerType(),True),
    StructField("Cond", IntegerType(),True)
  ])

df_1 = spark.createDataFrame(data=data_1,schema=schema_1)


 data_2 = [
     ("2022-08-03", 1),
     ("2022-08-04", 2),
     ("2022-08-05", 1),
     ("2022-08-06", 1),
     ("2022-08-07", 1),
     ("2022-08-08", 1),
     ("2022-08-09", 1),
     ("2022-08-10", 1),
     ("2022-08-11", 3),
     ("2022-08-12", 6),
     ("2022-08-13", 8),
     ("2022-08-14", 9),
     ("2022-08-15", 1),
     ("2022-08-16", 2),
     ("2022-08-17", 2),
     ("2022-08-18", 0),
     ("2022-08-19", 1),
     ("2022-08-20", 3),
     ("2022-08-21", 1)
 ]

schema_2 = StructType([
    StructField("Date", DateType(),True),
    StructField("Cond", IntegerType(),True)
  ])

df_2 = spark.createDataFrame(data=data_2,schema=schema_2)

UPDATE: I updated the question to be more clearly about the conditions to join the DFs!

Upvotes: 1

Views: 538

Answers (2)

viggnah
viggnah

Reputation: 1879

  1. Do a left join to get the dates you are interested in.
  2. Then use pyspark.sql.window to get the values you need into a list and take size of this as Count.
  3. Finally with the help of pyspark.sql.functions.aggregate get the Avg.
from pyspark.sql import functions as F, Window

# cast to date, and rename columns for later use
df_1 = df_1.withColumn("Date", F.col("Date").cast("date"))
df_2 = df_2.withColumn("Date", F.col("Date").cast("date"))
df_2 = df_2.withColumnRenamed("Date", "DateDf2")\
        .withColumnRenamed("Cond", "CondDf2")

# left join 
df = df_1.join(df_2, (df_1.Cond==df_2.CondDf2)&(df_1.Date==df_2.DateDf2), how='left')

windowSpec = Window.partitionBy("Id", "Cond").orderBy("Date")

# all the magic happens here!
df = (
    # only start counting when "DateDf2" is not null, and put the values into a list
    df.withColumn("value_list", F.when(F.isnull("DateDf2"), F.array()).otherwise(F.collect_list("Value").over(windowSpec.rowsBetween(-4, -1))))
    .withColumn("Count", F.size("value_list"))
    # use aggregate to sum up the list only if the size is 4! and divide by 4 to get average
    .withColumn("Avg", F.when(F.col("count")==4, F.aggregate("value_list", F.lit(0), lambda acc,x: acc+x)/4).otherwise(F.lit(0)))
    .select("Id", "Date", "Avg", "Count")
    .orderBy("Id", "Date")
)

Output is:

+---+----------+------+-----+
|Id |Date      |Avg   |Count|
+---+----------+------+-----+
|1  |2022-08-03|0.0   |0    |
|1  |2022-08-04|0.0   |0    |
|1  |2022-08-05|0.0   |0    |
|1  |2022-08-06|0.0   |0    |
|1  |2022-08-07|0.0   |0    |
|1  |2022-08-08|0.0   |0    |
|1  |2022-08-09|0.0   |0    |
|1  |2022-08-10|0.0   |0    |
|1  |2022-08-11|0.0   |0    |
|1  |2022-08-12|0.0   |0    |
|1  |2022-08-13|0.0   |0    |
|1  |2022-08-14|0.0   |0    |
|1  |2022-08-15|0.0   |0    |
|1  |2022-08-16|0.0   |0    |
|1  |2022-08-17|0.0   |0    |
|1  |2022-08-18|0.0   |0    |
|1  |2022-08-19|0.0   |0    |
|1  |2022-08-20|0.0   |0    |
|1  |2022-08-21|0.0   |0    |
|2  |2022-08-03|0.0   |0    |
|2  |2022-08-04|0.0   |0    |
|2  |2022-08-05|0.0   |1    |
|2  |2022-08-06|0.0   |2    |
|2  |2022-08-07|0.0   |3    |
|2  |2022-08-08|237.5 |4    |
|2  |2022-08-09|250.0 |4    |
|2  |2022-08-10|243.75|4    |
|2  |2022-08-11|0.0   |0    |
|2  |2022-08-12|0.0   |0    |
|2  |2022-08-13|0.0   |0    |
|2  |2022-08-14|0.0   |0    |
|2  |2022-08-15|206.25|4    |
|2  |2022-08-16|0.0   |0    |
|2  |2022-08-17|0.0   |0    |
|2  |2022-08-18|0.0   |0    |
|2  |2022-08-19|243.75|4    |
|2  |2022-08-20|0.0   |0    |
|2  |2022-08-21|337.5 |4    |
+---+----------+------+-----+

Upvotes: 1

Deku07
Deku07

Reputation: 146

here is the solution for the same


Solution:
from pyspark.sql import Window
import pyspark.sql.functions as F
df_1= df_1.withColumn("Date",F.col("Date").cast("timestamp"))
df_2= df_2.withColumn("Date",F.col("Date").cast("timestamp"))
window_spec =  Window.partitionBy(["Id"]).orderBy("Date")  
four_days_sld_wnd_exl_cuurent_row = Window.partitionBy(["Id"]).orderBy(["rnk"]).rangeBetween(-4, -1)
window_spec_count_cond_ = Window.partitionBy(["Id"]).orderBy(F.unix_timestamp("Date", 'yyyy-MM-dd') / 86400).rangeBetween(-4, -1)
agg_col_cond_ = (F.col("agg") ==0.0)
date_2_col_cond_ = (F.col("Date_2").isNull())
valid_4_days_agg_value =(F.when((~date_2_col_cond_) & (F.size(F.col("date_arrays_with_cond_1"))==4),
                         F.sum(F.col("Value")).over(four_days_sld_wnd_exl_cuurent_row)).otherwise(F.lit(0.0)))

count_cond_ = (F.when(~agg_col_cond_ & ~date_2_col_cond_,F.lit(4))
               .when(agg_col_cond_ & date_2_col_cond_,F.lit(0))
               .otherwise(F.size(F.collect_set(F.col("Date_2")).over(window_spec_count_cond_))))
df_jn = df_1.join(df_2,["Date","Cond"],"left")\
.select(df_1["*"],df_2["Date"].alias("Date_2")).orderBy("Id",df_1["Date"])
filter_having_cond_1=(F.col("Cond") == 1)
cond_columns_matching = (F.col("Date_2").isNull())
df_fnl_with_cond_val_1 = df_jn.filter(filter_having_cond_1)
df_fnl_with_cond_val_other=df_jn.filter(~filter_having_cond_1)\
.withColumn("agg",F.lit(0.0))\
.withColumn("count",F.lit(0))\
.drop("Date_2")

df_fnl_with_cond_val_1 = df_fnl_with_cond_val_1\
.withColumn("rnk",F.row_number().over(window_spec))\
.withColumn("date_arrays_with_cond_1", F.collect_set(F.col("Date")).over(four_days_sld_wnd_exl_cuurent_row))\
.withColumn("agg",valid_4_days_agg_value/4)\
.withColumn("count",count_cond_)\
.drop("date_arrays_with_cond_1","rnk","Date_2")
df_fnl = df_fnl_with_cond_val_1.unionByName(df_fnl_with_cond_val_other)
df_fnl.orderBy(["id","Date"]).show(50,0)

kindly upvote if you like my solution .

output
+---+-------------------+-----+----+------+-----+
|Id |Date               |Value|Cond|agg   |count|
+---+-------------------+-----+----+------+-----+
|1  |2022-08-03 00:00:00|100  |1   |0.0   |0    |
|1  |2022-08-04 00:00:00|200  |2   |0.0   |0    |
|1  |2022-08-05 00:00:00|150  |3   |0.0   |0    |
|1  |2022-08-06 00:00:00|300  |4   |0.0   |0    |
|1  |2022-08-07 00:00:00|400  |5   |0.0   |0    |
|1  |2022-08-08 00:00:00|150  |6   |0.0   |0    |
|1  |2022-08-09 00:00:00|500  |7   |0.0   |0    |
|1  |2022-08-10 00:00:00|150  |8   |0.0   |0    |
|1  |2022-08-11 00:00:00|150  |9   |0.0   |0    |
|1  |2022-08-12 00:00:00|700  |1   |0.0   |0    |
|1  |2022-08-13 00:00:00|800  |2   |0.0   |0    |
|1  |2022-08-14 00:00:00|150  |2   |0.0   |0    |
|1  |2022-08-15 00:00:00|300  |0   |0.0   |0    |
|1  |2022-08-16 00:00:00|200  |1   |0.0   |0    |
|1  |2022-08-17 00:00:00|150  |3   |0.0   |0    |
|1  |2022-08-18 00:00:00|150  |1   |0.0   |0    |
|1  |2022-08-19 00:00:00|250  |4   |0.0   |0    |
|1  |2022-08-20 00:00:00|150  |5   |0.0   |0    |
|1  |2022-08-21 00:00:00|400  |6   |0.0   |0    |
|2  |2022-08-03 00:00:00|100  |1   |0.0   |0    |
|2  |2022-08-04 00:00:00|200  |2   |0.0   |0    |
|2  |2022-08-05 00:00:00|150  |1   |0.0   |1    |
|2  |2022-08-06 00:00:00|300  |1   |0.0   |2    |
|2  |2022-08-07 00:00:00|400  |1   |0.0   |3    |
|2  |2022-08-08 00:00:00|150  |1   |237.5 |4    |
|2  |2022-08-09 00:00:00|125  |1   |250.0 |4    |
|2  |2022-08-10 00:00:00|150  |1   |243.75|4    |
|2  |2022-08-11 00:00:00|150  |3   |0.0   |0    |
|2  |2022-08-12 00:00:00|170  |6   |0.0   |0    |
|2  |2022-08-13 00:00:00|150  |7   |0.0   |0    |
|2  |2022-08-14 00:00:00|150  |8   |0.0   |0    |
|2  |2022-08-15 00:00:00|300  |1   |206.25|4    |
|2  |2022-08-16 00:00:00|150  |9   |0.0   |0    |
|2  |2022-08-17 00:00:00|150  |0   |0.0   |0    |
|2  |2022-08-18 00:00:00|400  |1   |0.0   |0    |
|2  |2022-08-19 00:00:00|150  |1   |243.75|4    |
|2  |2022-08-20 00:00:00|500  |1   |0.0   |0    |
|2  |2022-08-21 00:00:00|150  |1   |337.5 |4    |
+---+-------------------+-----+----+------+-----+

Upvotes: 1

Related Questions