Reputation: 245
Hi i have a dataset looks like :
my input :
+----------+----------------+
| id | flag |
+----------+----------------|
| 1 | false |
+----------+----------------|
| 2 | true |
+----------+----------------|
| 3 | false |
+----------+----------------|
| 4 | true |
+----------+----------------|
| 5 | false |
+----------+----------------|
| 6 | false |
+----------+----------------|
| 7 | true |
+----------+----------------+
output :
+----------+----------------+----------------------------+
| id | flag | new_col |
+----------+---------------------------------------------+
| 1 | false | 1 |
+----------+---------------------------------------------+
| 2 | true | 1 |
+----------+----------------+----------------------------+
| 3 | false | 3 |
+----------+----------------+----------------------------+
| 4 | true | 3 |
+----------+----------------+----------------------------+
| 5 | false | 5 |
+----------+----------------+----------------------------+
| 6 | false | 6 |
+----------+----------------+----------------------------+
| 7 | true | 6 |
+----------+----------------+----------------------------+
each false value will change the new_col value to its id and so on... any help please ?
Upvotes: 1
Views: 3316
Reputation: 10082
With a dataset of smaller size, you could do the following:
when
-otherwise
to with withColumn
to create a new column which will take the value of id
or null
depending on the value of flag
which in SQL is equivalent to:CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//| 1|false|
//| 2| true|
//| 3| true|
//| 4| true|
//| 5|false|
//| 6| true|
//| 7| true|
//+---+-----+
//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window
//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")
//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
.withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
.show
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1 |false|1 |
//|2 |true |1 |
//|3 |true |1 |
//|4 |true |1 |
//|5 |false|5 |
//|6 |true |5 |
//|7 |true |5 |
//+---+-----+-------+
Upvotes: 3
Reputation: 41957
If you want to go with rdd
way then you can pass all the data to one executor and do the for loop as below
df.rdd.coalesce(1).mapPartitions(iterator => {
var y = "1"
for (x <- iterator) yield {
val id = x.getAs[String]("id")
val flag = x.getAs[Boolean]("flag")
if(flag == false){
y = id
newdf(id, flag, y)
}else{
newdf(id, flag, y)
}
}
}).toDF()
and for that you would need a case class
case class newdf(id:String, flag:Boolean, new_id:String)
You can do without case class too but I prefer to use case class
Upvotes: -1