Aditya Seth
Aditya Seth

Reputation: 53

How to extract row from data frame and apply transforms

def fun(row: Row): Seq[String] = {
     // some logic
}

+-------+----------------+-------+----------------+
|   name|       High on life     | waka  |  my love
+-------+----------------+-------+----------------+
|beatles|      0                 |  0    |  0
|  romeo|      0                 |  0    |  0
+-------+---------------++-------+----------------+

// and fun will return for row 1 
Seq("waka","High on life") 
// and  for row 2 
Seq("waka","my love")

+-------+----------------+-------+----------------+
|   name|       High on life     | waka  |  my love
+-------+----------------+-------+----------------+
|beatles|      1                 |  1    |  0
|  romeo|      0                 |  1    |  1
+-------+---------------++-------+----------------+

Basically, I want suggestion. How could I increment the value or change the column value for that particular row column?

I am a newcomer to spark and scala so please, could tell me how I can iterate the row also?

Upvotes: 2

Views: 87

Answers (1)

Chema
Chema

Reputation: 2828

You can change column values based on preconditions or arbitrarily, filter and drop some rows based on preconditions. A transformations on Dataframes or RDDs always returns a new Dataframe or RDD.

As an example

 val spark = SparkSession  // We create Spark object
    .builder()
    .appName("MyApp")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "MyApp")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext // we create SparkContext object

      val data = sc.textFile(input) // read a file and create a RDD
      val head = data.first() // header 

      val multiline = data
          .filter(line => line != head) // remove header 
          .map(line => line.split('|')) //RDD[String] => RDD[Array[String]]
          .map(arr =>{ // Based on preconditions we transform the RDD[Array[String]]
            val sInt: Int = makeInt(arr(0)) // to RDD[(String, String)]
            if(sInt < 0) (sInt.toString, arr(0))
            else (arr(0),arr(1))
          })
          .toDF("column1", "column2") // RDD to Dataframe

      multiline.show() // Show the content of the Dataframe
      /*
      map, filter, show are HOFs (Higher Order Functions) that iterate the RDD or 
     Dataframe
     There are a lot of HOFs to transform in many ways a RDD or Dataframe.
     The API documentation: http://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.rdd.RDD
      */

Upvotes: 1

Related Questions