Abhijeet Thorat
Abhijeet Thorat

Reputation: 31

Increment value dynamically pyspark dataframe

I have below data frame

+--------------------+----------------+----------------+-----------+---+
|         patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|
+--------------------+----------------+----------------+-----------+---+
|00000000000072128380|           prod1|      2009-02-23| 2009-05-22|  1|
|00000000000072128380|           prod1|      2010-04-05| 2009-05-22|  2|
|00000000000072128380|           prod1|      2009-03-23| 2009-05-22|  3|
|00000000000072128380|           prod1|      2009-04-20| 2009-05-22|  4|
|00000000000072128380|           prod1|      2009-05-16| 2009-05-22|  5|
|00000000000072128380|           prod1|      2009-06-17| 2009-05-22|  6|
|00000000000072128380|           prod1|      2009-07-15| 2009-05-22|  7|
|00000000000072128380|           prod1|      2009-08-12| 2009-05-22|  8|
|00000000000072128380|           prod1|      2009-09-05| 2009-05-22|  9|
|00000000000072128380|           prod1|      2009-10-06| 2009-05-22| 10|
|00000000000072128380|           prod2|      2009-10-28| 2009-05-22|  1|
|00000000000072128380|           prod2|      2009-12-02| 2009-05-22|  2|
|00000000000072128380|           prod2|      2010-05-10| 2009-05-22|  3|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22|  4|
|00000000000072128380|           prod2|      2010-07-06| 2009-05-22|  5|
|00000000000072128380|           prod2|      2010-08-03| 2009-05-22|  6|
|00000000000072128380|           prod2|      2010-09-23| 2009-05-22|  7|
|00000000000072128380|           prod2|      2010-10-20| 2009-05-22|  8|
|00000000000072128380|           prod2|      2010-01-29| 2009-05-22|  9|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22| 10|
+--------------------+----------------+----------------+-----------+---+

Use case: I want to add new column episode with below logic if rank is 1 Episode =1 . if rank > 1 and product is same and interaction_date > rx_end_date then Episode = previous Episode + 1 otherwise Episode = previous Episode

Expected result would be

+--------------------+----------------+----------------+-----------+---+-------+
|         patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|episode|
+--------------------+----------------+----------------+-----------+---+-------+
|00000000000072128380|           prod1|      2009-02-23| 2009-05-22|  1|      1|
|00000000000072128380|           prod1|      2010-04-05| 2009-05-22|  2|      2|
|00000000000072128380|           prod1|      2009-03-23| 2009-05-22|  3|      2|
|00000000000072128380|           prod1|      2009-04-20| 2009-05-22|  4|      2|
|00000000000072128380|           prod1|      2009-05-16| 2009-05-22|  5|      2|
|00000000000072128380|           prod1|      2009-06-17| 2009-05-22|  6|      3|
|00000000000072128380|           prod1|      2009-07-15| 2009-05-22|  7|      4|
|00000000000072128380|           prod1|      2009-08-12| 2009-05-22|  8|      5|
|00000000000072128380|           prod1|      2009-09-05| 2009-05-22|  9|      6|
|00000000000072128380|           prod1|      2009-10-06| 2009-05-22| 10|      7|
|00000000000072128380|           prod2|      2009-10-28| 2009-05-22|  1|      1|
|00000000000072128380|           prod2|      2009-12-02| 2009-05-22|  2|      2|
|00000000000072128380|           prod2|      2010-05-10| 2009-05-22|  3|      3|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22|  4|      3|
|00000000000072128380|           prod2|      2010-07-06| 2009-05-22|  5|      4|
|00000000000072128380|           prod2|      2010-08-03| 2009-05-22|  6|      5|
|00000000000072128380|           prod2|      2010-09-23| 2009-05-22|  7|      6|
|00000000000072128380|           prod2|      2010-10-20| 2009-05-22|  8|      7|
|00000000000072128380|           prod2|      2010-01-29| 2009-05-22|  9|      8|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22| 10|      8|
+--------------------+----------------+----------------+-----------+---+-------+

I want to use spark window function to implement above logic or else any spark data frame function to do this?

Upvotes: 1

Views: 2086

Answers (1)

Prem
Prem

Reputation: 11955

Hope this helps!

from pyspark.sql.functions import col, when, lag, last
from pyspark.sql.window import Window
import sys

df = sc.parallelize([
    ['00000000000072128380', 'prod1', '2009-02-23', '2009-05-22', 1],
    ['00000000000072128380', 'prod1', '2010-04-05', '2009-05-22', 2],
    ['00000000000072128380', 'prod1', '2009-03-23', '2009-05-22', 3],
    ['00000000000072128380', 'prod1', '2009-04-20', '2009-05-22', 4],
    ['00000000000072128380', 'prod1', '2009-05-16', '2009-05-22', 5],
    ['00000000000072128380', 'prod1', '2009-06-17', '2009-05-22', 6],
    ['00000000000072128380', 'prod1', '2009-07-15', '2009-05-22', 7],
    ['00000000000072128380', 'prod1', '2009-08-12', '2009-05-22', 8],
    ['00000000000072128380', 'prod1', '2009-09-05', '2009-05-22', 9],
    ['00000000000072128380', 'prod1', '2009-10-06', '2009-05-22', 10],
    ['00000000000072128380', 'prod2', '2009-10-28', '2009-05-22', 1],
    ['00000000000072128380', 'prod2', '2009-12-02', '2009-05-22', 2],
    ['00000000000072128380', 'prod2', '2010-05-10', '2009-05-22', 3],
    ['00000000000072128380', 'prod2', '2008-05-22', '2009-05-22', 4],
    ['00000000000072128380', 'prod2', '2010-07-06', '2009-05-22',  5],
    ['00000000000072128380', 'prod2', '2010-08-03', '2009-05-22',  6],
    ['00000000000072128380', 'prod2', '2010-09-23', '2009-05-22',  7],
    ['00000000000072128380', 'prod2', '2010-10-20', '2009-05-22',  8],
    ['00000000000072128380', 'prod2', '2010-01-29', '2009-05-22',  9],
    ['00000000000072128380', 'prod2', '2008-05-22', '2009-05-22', 10]]).toDF(('patient_gid','interaction_desc', 'interaction_date', 'rx_end_date', 'rnk'))

w = Window.partitionBy(col("interaction_desc")).orderBy(col("rnk"))
df1 = df.withColumn("episode_temp",
                    when(col('rnk')==1, 1).
                    when((col('rnk')>1) &
                         (col('interaction_desc') == lag("interaction_desc").over(w)) &
                         (col('interaction_date') > col('rx_end_date')), col('rnk')).
                    otherwise(None))
df1 = df1.withColumn("episode", last('episode_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).drop('episode_temp')
df1.show()

Output is

+--------------------+----------------+----------------+-----------+---+-------+
|         patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|episode|
+--------------------+----------------+----------------+-----------+---+-------+
|00000000000072128380|           prod1|      2009-02-23| 2009-05-22|  1|      1|
|00000000000072128380|           prod1|      2010-04-05| 2009-05-22|  2|      2|
|00000000000072128380|           prod1|      2009-03-23| 2009-05-22|  3|      2|
|00000000000072128380|           prod1|      2009-04-20| 2009-05-22|  4|      2|
|00000000000072128380|           prod1|      2009-05-16| 2009-05-22|  5|      2|
|00000000000072128380|           prod1|      2009-06-17| 2009-05-22|  6|      6|
|00000000000072128380|           prod1|      2009-07-15| 2009-05-22|  7|      7|
|00000000000072128380|           prod1|      2009-08-12| 2009-05-22|  8|      8|
|00000000000072128380|           prod1|      2009-09-05| 2009-05-22|  9|      9|
|00000000000072128380|           prod1|      2009-10-06| 2009-05-22| 10|     10|
|00000000000072128380|           prod2|      2009-10-28| 2009-05-22|  1|      1|
|00000000000072128380|           prod2|      2009-12-02| 2009-05-22|  2|      2|
|00000000000072128380|           prod2|      2010-05-10| 2009-05-22|  3|      3|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22|  4|      3|
|00000000000072128380|           prod2|      2010-07-06| 2009-05-22|  5|      5|
|00000000000072128380|           prod2|      2010-08-03| 2009-05-22|  6|      6|
|00000000000072128380|           prod2|      2010-09-23| 2009-05-22|  7|      7|
|00000000000072128380|           prod2|      2010-10-20| 2009-05-22|  8|      8|
|00000000000072128380|           prod2|      2010-01-29| 2009-05-22|  9|      9|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22| 10|      9|
+--------------------+----------------+----------------+-----------+---+-------+

Output is not exactly the same as you desired but similar and more importantly it is monotonically increasing.

Upvotes: 1

Related Questions