Yali Pollak
Yali Pollak

Reputation: 35

Spark Scala - Need to iterate over column in dataframe

Got the next dataframe:

+---+----------------+
|id |job_title       |
+---+----------------+
|1  |ceo             |
|2  |product manager |
|3  |surfer          |
+---+----------------+

I want to get a column from a dataframe and to create another column with indication called 'rank':

+---+----------------+-------+
|id |job_title       | rank  |
+---+----------------+-------+
|1  |ceo             |c-level|
|2  |product manager |manager|
|3  |surfer          |other  |
+---+----------------+-------+

--- UPDATED ---

What I tried to do by now is:

def func (col: column) : Column = {
val cLevel = List("ceo","cfo")
val managerLevel = List("manager","team leader")

when (col.contains(cLevel), "C-level")
.otherwise(when(col.contains(managerLevel),"manager").otherwise("other"))}

Currently I get a this error:

type mismatch;
found   : Boolean
required: org.apache.spark.sql.Column

and I think I have also other problems within the code.Sorry but I'm on a starting level with Scala over Spark.

Upvotes: 0

Views: 3505

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

You can use when/otherwise inbuilt function for that case as

import org.apache.spark.sql.functions._
def func = when(col("job_title").contains("cheif") || col("job_title").contains("ceo"), "c-level")
  .otherwise(when(col("job_title").contains("manager"), "manager")
    .otherwise("other"))

and you can call the function by using withColumn as

df.withColumn("rank", func).show(false)

which should give you

+---+---------------+-------+
|id |job_title      |rank   |
+---+---------------+-------+
|1  |ceo            |c-level|
|2  |product manager|manager|
|3  |surfer         |other  |
+---+---------------+-------+

I hope the answer is helpful

Updated

I see that you have updated your post with your tryings, and you have tried creating a list of levels and you want to validate against the list. For that case you will have to write a udf function as

val cLevel = List("ceo","cfo")
val managerLevel = List("manager","team leader")

import org.apache.spark.sql.functions._
def rankUdf = udf((jobTitle: String) => jobTitle match {
  case x if(cLevel.exists(_.contains(x)) || cLevel.exists(x.contains(_))) => "C-Level"
  case x if(managerLevel.exists(_.contains(x)) || managerLevel.exists(x.contains(_))) => "manager"
  case _ => "other"
})

df.withColumn("rank", rankUdf(col("job_title"))).show(false)

which should give you your desired output

Upvotes: 2

vaquar khan
vaquar khan

Reputation: 11489

 val df = sc.parallelize(Seq(
  (1,"ceo"),
  ( 2,"product manager"), 
  (3,"surfer"),
  (4,"Vaquar khan")
)).toDF("id", "job_title")

df.show()
//option 2
df.createOrReplaceTempView("user_details")


sqlContext.sql("SELECT job_title, RANK() OVER (ORDER BY id) AS rank FROM user_details").show


val df1 = sc.parallelize(Seq(
  ("ceo","c-level"), 
  ( "product manager","manager"),
  ("surfer","other"),
  ("Vaquar khan","Problem solver")
)).toDF("job_title", "ranks")
df1.show()
df1.createOrReplaceTempView("user_rank")


sqlContext.sql("SELECT user_details.id,user_details.job_title,user_rank.ranks FROM user_rank JOIN user_details ON user_rank.job_title = user_details.job_title order by user_details.id").show

Results :

+---+---------------+
| id|      job_title|
+---+---------------+
|  1|            ceo|
|  2|product manager|
|  3|         surfer|
|  4|    Vaquar khan|
+---+---------------+

+---------------+----+
|      job_title|rank|
+---------------+----+
|            ceo|   1|
|product manager|   2|
|         surfer|   3|
|    Vaquar khan|   4|
+---------------+----+

+---------------+--------------+
|      job_title|         ranks|
+---------------+--------------+
|            ceo|       c-level|
|product manager|       manager|
|         surfer|         other|
|    Vaquar khan|Problem solver|
+---------------+--------------+

+---+---------------+--------------+
| id|      job_title|         ranks|
+---+---------------+--------------+
|  1|            ceo|       c-level|
|  2|product manager|       manager|
|  3|         surfer|         other|
|  4|    Vaquar khan|Problem solver|
+---+---------------+--------------+

df: org.apache.spark.sql.DataFrame = [id: int, job_title: string]
df1: org.apache.spark.sql.DataFrame = [job_title: string, ranks: string]

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Upvotes: 0

Related Questions