sandevfares
sandevfares

Reputation: 245

how to set a increment id over set of rows with respect to a col value in spark

Hi i have a dataset looks like :

my input :

+----------+----------------+
|  id      |  flag          |
+----------+----------------|
|  1       | false          |  
+----------+----------------|
|  2       | true           | 
+----------+----------------|
|  3       | false          |
+----------+----------------|
|  4       | true           |  
+----------+----------------|
|  5       | false          |
+----------+----------------|
|  6       | false          |  
+----------+----------------|
|  7       | true           |
+----------+----------------+

output :

+----------+----------------+----------------------------+
|  id      |  flag          |  new_col                   |
+----------+---------------------------------------------+
|  1       | false          |      1                     |
+----------+---------------------------------------------+
|  2       | true           |      1                     |
+----------+----------------+----------------------------+
|  3       | false          |      3                     |
+----------+----------------+----------------------------+
|  4       | true           |      3                     |
+----------+----------------+----------------------------+
|  5       | false          |      5                     |
+----------+----------------+----------------------------+
|  6       | false          |      6                     |
+----------+----------------+----------------------------+
|  7       | true           |      6                     |
+----------+----------------+----------------------------+

each false value will change the new_col value to its id and so on... any help please ?

Upvotes: 1

Views: 3316

Answers (2)

philantrovert
philantrovert

Reputation: 10082

With a dataset of smaller size, you could do the following:

  1. Use when-otherwise to with withColumn to create a new column which will take the value of id or null depending on the value of flag which in SQL is equivalent to:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
  1. Then use coalesce to replace all the nulls with last over a Window to get the last non-null value:
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//|  1|false|
//|  2| true|
//|  3| true|
//|  4| true|
//|  5|false|
//|  6| true|
//|  7| true|
//+---+-----+

//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window

//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")

//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values   
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
  .withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
  .show   
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1  |false|1      |
//|2  |true |1      |
//|3  |true |1      |
//|4  |true |1      |
//|5  |false|5      |
//|6  |true |5      |
//|7  |true |5      |
//+---+-----+-------+

Upvotes: 3

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If you want to go with rdd way then you can pass all the data to one executor and do the for loop as below

df.rdd.coalesce(1).mapPartitions(iterator => {
  var y = "1"
  for (x <- iterator) yield {
    val id = x.getAs[String]("id")
    val flag = x.getAs[Boolean]("flag")
    if(flag == false){
      y = id
      newdf(id, flag, y)
    }else{
      newdf(id, flag, y)
    }
  }
}).toDF()

and for that you would need a case class

case class newdf(id:String, flag:Boolean, new_id:String)

You can do without case class too but I prefer to use case class

Upvotes: -1

Related Questions