Salih
Salih

Reputation: 719

PySpark Fillling Some Specific Missing Values

My spark dataframe is;

Client  Date        Due_Day
A      2017-01-01   Null
A      2017-02-01   Null
A      2017-03-01   Null
A      2017-04-01   Null
A      2017-05-01   Null
A      2017-06-01   35
A      2017-07-01   Null
A      2017-08-01   Null
A      2017-09-01   Null
A      2017-10-01   Null
A      2017-11-01   Null
A      2017-12-01   Null
B      2017-01-01   Null
B      2017-02-01   Null
B      2017-03-01   Null
B      2017-04-01   Null
B      2017-05-01   Null
B      2017-06-01   Null
B      2017-07-01   Null
B      2017-08-01   Null
B      2017-09-01   Null
B      2017-10-01   78
B      2017-11-01   Null
B      2017-12-01   Null

There is one non-NULL Due_Day for the same Client in the dataframe.

Desired output is;

Client  Date    Due_Day    Result
A   2017-01-01  Null       Null
A   2017-02-01  Null       Null
A   2017-03-01  Null       Null
A   2017-04-01  Null       Null
A   2017-05-01  Null       Null         -> This month should be remain as Null
A   2017-06-01  35         35
A   2017-07-01  Null       Paid         -> After one month should be 'Paid'
A   2017-08-01  Null       OK           -> Next Months should be 'OK'
A   2017-09-01  Null       OK
A   2017-10-01  Null       OK
A   2017-11-01  Null       OK
A   2017-12-01  Null       OK
B   2017-01-01  Null       Null
B   2017-02-01  Null       Null
B   2017-03-01  Null       Null
B   2017-04-01  Null       Null
B   2017-05-01  Null       Null
B   2017-06-01  Null       Null
B   2017-07-01  Null       Null
B   2017-08-01  Null       Null
B   2017-09-01  Null       Null
B   2017-10-01  78         78
B   2017-11-01  Null       Paid         -> After one month
B   2017-12-01  Null       OK

For a client, it should labeled as 'Paid' for the month after non-NULL Due_Day. And should labeled as 'OK' for the next months until the end of the year. Previous months should remain Null again.

Could you please help me pyspark code about this?

Upvotes: 0

Views: 67

Answers (1)

dsk
dsk

Reputation: 2003

Below can be a working solution for you, I will try to explain the logic behind the solution -

  1. The point you have some value in due_day column, we are doing a forward fill in order to fill the next rows with the same value for future calculation.
  2. Once , the forward_fill column is configured, then next becomes easy and we can write the underlaying logic.

Create the DF Here

df = spark.createDataFrame([("A","2017-01-01", None),("A","2017-02-01", None),("A","2017-03-01", 35),("A","2017-04-01",None),("A","2017-05-01", None),("B","2017-01-01", None),("B","2017-02-01", 78),("B","2017-03-01", None),("B","2017-04-01", None),("B","2017-05-01", None)],["col1", "col2", "col3"])
df.show(truncate=False) 
+----+----------+----+
|col1|col2      |col3|
+----+----------+----+
|A   |2017-01-01|null|
|A   |2017-02-01|null|
|A   |2017-03-01|35  |
|A   |2017-04-01|null|
|A   |2017-05-01|null|
|B   |2017-01-01|null|
|B   |2017-02-01|78  |
|B   |2017-03-01|null|
|B   |2017-04-01|null|
|B   |2017-05-01|null|
+----+----------+----+

Forward Fill Here in order to fill the next rows with same values

w = W.partitionBy("col1").orderBy("col2")
df = df.withColumn("filled_col", F.last("col3", ignorenulls=True).over(w))
df.show()
+----+----------+----+----------+
|col1|      col2|col3|filled_col|
+----+----------+----+----------+
|   B|2017-01-01|null|      null|
|   B|2017-02-01|  78|        78|
|   B|2017-03-01|null|        78|
|   B|2017-04-01|null|        78|
|   B|2017-05-01|null|        78|
|   A|2017-01-01|null|      null|
|   A|2017-02-01|null|      null|
|   A|2017-03-01|  35|        35|
|   A|2017-04-01|null|        35|
|   A|2017-05-01|null|        35|
+----+----------+----+----------+

We will assign a row_number in each row as we already know that where the row number is 2 then it will be Paid and remains Ok

    w2 = W.partitionBy("col1", "filled_col").orderBy("col2")
df = df.withColumn("rnk", F.row_number().over(w2))
df = df.withColumn("Result", F.when(((F.col("filled_col").isNotNull()) & (F.col("rnk") ==F.lit("2"))), F.lit("Paid")).when(((F.col("filled_col").isNotNull()) & (F.col("rnk") > 2)), F.lit("OK")))
df.show()
+----+----------+----+----------+---+------+
|col1|      col2|col3|filled_col|rnk|Result|
+----+----------+----+----------+---+------+
|   B|2017-01-01|null|      null|  1|  null|
|   B|2017-02-01|  78|        78|  1|  null|
|   B|2017-03-01|null|        78|  2|  Paid|
|   B|2017-04-01|null|        78|  3|    OK|
|   B|2017-05-01|null|        78|  4|    OK|
|   A|2017-01-01|null|      null|  1|  null|
|   A|2017-02-01|null|      null|  2|  null|
|   A|2017-03-01|  35|        35|  1|  null|
|   A|2017-04-01|null|        35|  2|  Paid|
|   A|2017-05-01|null|        35|  3|    OK|
+----+----------+----+----------+---+------+

Select Final Column of your choice

df = df.withColumn("Result", F.coalesce("Result" , "col3"))
df.select("col1", "col2", "col3", "Result").orderBy("col1").show()
+----+----------+----+------+
|col1|      col2|col3|Result|
+----+----------+----+------+
|   A|2017-01-01|null|  null|
|   A|2017-02-01|null|  null|
|   A|2017-03-01|  35|    35|
|   A|2017-04-01|null|  Paid|
|   A|2017-05-01|null|    OK|
|   B|2017-01-01|null|  null|
|   B|2017-02-01|  78|    78|
|   B|2017-03-01|null|  Paid|
|   B|2017-04-01|null|    OK|
|   B|2017-05-01|null|    OK|
+----+----------+----+------+

Upvotes: 1

Related Questions