Reputation: 1345
I have a dataframe like the one below. I want a new column called cutofftype - which instead of the current monotonically increasing number should reset to 1 every time the ID column changes .
df = df.orderBy("ID","date").withColumn("cutofftype",monotonically_increasing_id()+1)
+------+---------------+----------+
| ID | date |cutofftype|
+------+---------------+----------+
| 54441| 2016-06-20| 1|
| 54441| 2016-06-27| 2|
| 54441| 2016-07-04| 3|
| 54441| 2016-07-11| 4|
| 54500| 2016-05-02| 5|
| 54500| 2016-05-09| 6|
| 54500| 2016-05-16| 7|
| 54500| 2016-05-23| 8|
| 54500| 2016-06-06| 9|
| 54500| 2016-06-13| 10|
+------+---------------+----------+
Target is this as below :
+------+---------------+----------+
| ID | date |cutofftype|
+------+---------------+----------+
| 54441| 2016-06-20| 1|
| 54441| 2016-06-27| 2|
| 54441| 2016-07-04| 3|
| 54441| 2016-07-11| 4|
| 54500| 2016-05-02| 1|
| 54500| 2016-05-09| 2|
| 54500| 2016-05-16| 3|
| 54500| 2016-05-23| 4|
| 54500| 2016-06-06| 5|
| 54500| 2016-06-13| 6|
+------+---------------+----------+
I know this can be done with for loops - i want to do it without for loops >> Is there a way out ?
Upvotes: 0
Views: 525
Reputation: 13541
Simple partition by problem. You should use the window
.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("ID").orderBy("date")
df.withColumn("cutofftype", row_number().over(w)).show()
+-----+----------+----------+
| ID| date|cutofftype|
+-----+----------+----------+
|54500|2016-05-02| 1|
|54500|2016-05-09| 2|
|54500|2016-05-16| 3|
|54500|2016-05-23| 4|
|54500|2016-06-06| 5|
|54500|2016-06-13| 6|
|54441|2016-06-20| 1|
|54441|2016-06-27| 2|
|54441|2016-07-04| 3|
|54441|2016-07-11| 4|
+-----+----------+----------+
Upvotes: 1