Reputation: 139
hi my question is somewhat related to This (Fill in null with previously known good value with pyspark) but there is a slight change of requirement in my problem that is:
data: expected output:
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock| | item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null| |673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110| |673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null| |673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| null| |673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109| => |673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null| |673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108| |673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| null| |673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| null| |673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| null| |673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
my expected output is based on last known not null value and sales_qty, if there is sales_qty then the stock value should be adjusted according to that. I have tried the following logic
my_window = Window.partitionBy('item','store').orderBy('timestamp')
df = df.withColumn("stock", F.when((F.isnull(F.col('stock'))),F.lag(df.stock).over(my_window)-F.col('sales_qty')).otherwise(F.col('stock')))
but it only works for one null value, can someone please help me achieve the expected result?
Note: the quantity is NOT always in continuous decrease, so need to consider last not null value to calculate the new one
Upvotes: 2
Views: 817
Reputation: 8410
You could try this. I basically generate two columns first(first non null value=110) and stock2 which is basically incremental sum of stock and then subtract them from each other to get your desired stock.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 110|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
If you would like to force your first value to null instead of 110(as shown in your desired output) you could use this.(basically uses rownumber to replace that first 110 value with null) :
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
Additional data INPUT and OUTPUT:
#input1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 3| null|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 4| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 3| 106|
|673895|35578| 20180105| 0| 106|
|673895|35578| 20180106| 1| 105|
|673895|35578| 20180107| 0| 105|
|673895|35578| 20180108| 4| 101|
|673895|35578| 20180109| 0| 101|
|673895|35578| 20180110| 1| 100|
+------+-----+---------+---------+-----+
#input2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| null|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| 102|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| 98|
|673895|35578| 20180109| 0| 98|
|673895|35578| 20180110| 1| 97|
+------+-----+---------+---------+-----+
IF, the stock
quantities are NOT continuous like this:
df.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| null|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
You could use this:(i basically compute a dynamic window for each non-null last)
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 0|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| 110|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| 107|
|673895|35578| 20180109| 0| 107|
|673895|35578| 20180110| 1| 106|
+------+-----+---------+---------+-----+
Upvotes: 3