Wasif Tanveer
Wasif Tanveer

Reputation: 139

pyspark replacing null values with some calculation related to last not null values

hi my question is somewhat related to This (Fill in null with previously known good value with pyspark) but there is a slight change of requirement in my problem that is:

   data:                                        expected output:       
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+
   |  item|store|timestamp|sales_qty|stock|     |  item|store|timestamp|sales_qty|stock|
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+
   |673895|35578| 20180101|        1| null|     |673895|35578| 20180101|        1| null|
   |673895|35578| 20180102|        0|  110|     |673895|35578| 20180102|        0|  110|
   |673895|35578| 20180103|        1| null|     |673895|35578| 20180103|        1|  109|
   |673895|35578| 20180104|        0| null|     |673895|35578| 20180104|        0|  109|
   |673895|35578| 20180105|        0|  109|  => |673895|35578| 20180105|        0|  109|
   |673895|35578| 20180106|        1| null|     |673895|35578| 20180106|        1|  108|
   |673895|35578| 20180107|        0|  108|     |673895|35578| 20180107|        0|  108|
   |673895|35578| 20180108|        0| null|     |673895|35578| 20180108|        0|  108|
   |673895|35578| 20180109|        0| null|     |673895|35578| 20180109|        0|  108|
   |673895|35578| 20180110|        1| null|     |673895|35578| 20180110|        1|  107|
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+

my expected output is based on last known not null value and sales_qty, if there is sales_qty then the stock value should be adjusted according to that. I have tried the following logic

        my_window = Window.partitionBy('item','store').orderBy('timestamp')
        df = df.withColumn("stock", F.when((F.isnull(F.col('stock'))),F.lag(df.stock).over(my_window)-F.col('sales_qty')).otherwise(F.col('stock')))

but it only works for one null value, can someone please help me achieve the expected result?

Note: the quantity is NOT always in continuous decrease, so need to consider last not null value to calculate the new one

Upvotes: 2

Views: 817

Answers (1)

murtihash
murtihash

Reputation: 8410

You could try this. I basically generate two columns first(first non null value=110) and stock2 which is basically incremental sum of stock and then subtract them from each other to get your desired stock.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1|  110|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        0|  109|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1|  108|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        0|  108|
|673895|35578| 20180109|        0|  108|
|673895|35578| 20180110|        1|  107|
+------+-----+---------+---------+-----+

If you would like to force your first value to null instead of 110(as shown in your desired output) you could use this.(basically uses rownumber to replace that first 110 value with null) :

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()


+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        0|  109|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1|  108|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        0|  108|
|673895|35578| 20180109|        0|  108|
|673895|35578| 20180110|        1|  107|
+------+-----+---------+---------+-----+

Additional data INPUT and OUTPUT:

#input1
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        3| null|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1| null|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        4| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

#output1
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        3|  106|
|673895|35578| 20180105|        0|  106|
|673895|35578| 20180106|        1|  105|
|673895|35578| 20180107|        0|  105|
|673895|35578| 20180108|        4|  101|
|673895|35578| 20180109|        0|  101|
|673895|35578| 20180110|        1|  100|
+------+-----+---------+---------+-----+


#input2
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        7| null|
|673895|35578| 20180105|        0|  102|
|673895|35578| 20180106|        0| null|
|673895|35578| 20180107|        4|   98|
|673895|35578| 20180108|        0| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

#output2
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        7|  102|
|673895|35578| 20180105|        0|  102|
|673895|35578| 20180106|        0|  102|
|673895|35578| 20180107|        4|   98|
|673895|35578| 20180108|        0|   98|
|673895|35578| 20180109|        0|   98|
|673895|35578| 20180110|        1|   97|
+------+-----+---------+---------+-----+

IF, the stock quantities are NOT continuous like this:

  df.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        7| null|
|673895|35578| 20180105|        0|  112|
|673895|35578| 20180106|        2| null|
|673895|35578| 20180107|        0|  107|
|673895|35578| 20180108|        0| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

You could use this:(i basically compute a dynamic window for each non-null last)

from pyspark.sql.window import Window
from pyspark.sql import functions as F


w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1|    0|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        7|  102|
|673895|35578| 20180105|        0|  112|
|673895|35578| 20180106|        2|  110|
|673895|35578| 20180107|        0|  107|
|673895|35578| 20180108|        0|  107|
|673895|35578| 20180109|        0|  107|
|673895|35578| 20180110|        1|  106|
+------+-----+---------+---------+-----+

Upvotes: 3

Related Questions