Reputation: 2158
I'm looking for a way to apply a function to my DataFrame using UDF. My DataFrame looks like this:
+--------------------+-----+
| TOPIC|COUNT|
+--------------------+-----+
| [outlook]| 71|
| [AppsOnDemand]| 12|
| [OUTLOOK, OUTLOOK]| 1|
| [SkyPe]| 3|
| [Citrix, VPN]| 1|
| [Citrix]| 31|
| [VPN]| 51|
| [PANDA, panda]| 1|
| [SKYPE, SKYPE]| 2|
| [panda]| 5|
| [Cisco]| 75|
| [télétravail]| 14|
| [vpn]| 4|
| [OUTLOOK]| 212|
|[SKYPE, télétravail]| 2|
| [appsondemand]| 1|
| [WIFI]| 5|
| [CISCO, CISCO]| 4|
| [MOOC]| 2|
| [PANDA, Panda]| 1|
+--------------------+-----+
My objective is to loop over lists in the "TOPIC" column and change strings from lowercase to uppercase. So I need a simple Scala function which takes an array of strings as input and returns the uppercase version of those strings. To deal with only strings, it was very simple. I just did this:
import org.apache.spark.sql.functions.{array, col, count, lit, udf, upper}
DF.select($"COUNT", upper($"TOPIC")).show()
I was trying this, but it doesn't work:
def myFunc(context: Array[Seq[String]]) = udf {
(topic: Seq[String]) => context.toString().toUpperCase
}
val Df = (df
.where('TOPIC.isNotNull)
.select($"TOPIC", $"COUNT",
myFunc(context)($"TOPIC").alias("NEW_TOPIC"))
)
Upvotes: 0
Views: 2413
Reputation: 2371
Define your function as follows:
import org.apache.spark.sql.functions._
val arrayUpperCase = udf[Seq[String], Seq[String]](_.map(_.toUpperCase))
and then
df.select($"TOPIC", $"COUNT", arrayUpperCase($"TOPIC").alias("NEW_TOPIC")).show(false)
returns
+--------------------+-----+--------------------+
|TOPIC |COUNT|NEW_TOPIC |
+--------------------+-----+--------------------+
|[outlook] |71 |[OUTLOOK] |
|[AppsOnDemand] |12 |[APPSONDEMAND] |
|[OUTLOOK, OUTLOOK] |1 |[OUTLOOK, OUTLOOK] |
|[SkyPe] |3 |[SKYPE] |
|[Citrix, VPN] |1 |[CITRIX, VPN] |
|[Citrix] |31 |[CITRIX] |
|[VPN] |51 |[VPN] |
|[PANDA, panda] |1 |[PANDA, PANDA] |
|[SKYPE, SKYPE] |2 |[SKYPE, SKYPE] |
|[panda] |5 |[PANDA] |
|[Cisco] |75 |[CISCO] |
|[télétravail] |14 |[TÉLÉTRAVAIL] |
|[vpn] |4 |[VPN] |
|[OUTLOOK] |212 |[OUTLOOK] |
|[SKYPE, télétravail]|2 |[SKYPE, TÉLÉTRAVAIL]|
|[appsondemand] |1 |[APPSONDEMAND] |
|[WIFI] |5 |[WIFI] |
|[CISCO, CISCO] |4 |[CISCO, CISCO] |
|[MOOC] |2 |[MOOC] |
|[PANDA, Panda] |1 |[PANDA, PANDA] |
+--------------------+-----+--------------------+
Upvotes: 3
Reputation: 41957
You should write a udf
function as below
import org.apache.spark.sql.functions._
def upperUdf = udf((array: collection.mutable.WrappedArray[String])=> array.map(_.toUpperCase()))
and call it using withColumn
as
df.withColumn("TOPIC", upperUdf($"TOPIC"))
You should get output as
+--------------------+-----+
|TOPIC |COUNT|
+--------------------+-----+
|[OUTLOOK] |71 |
|[APPSONDEMAND] |12 |
|[OUTLOOK, OUTLOOK] |1 |
|[SKYPE] |3 |
|[CITRIX, VPN] |1 |
|[CITRIX] |31 |
|[VPN] |51 |
|[PANDA, PANDA] |1 |
|[SKYPE, SKYPE] |2 |
|[PANDA] |5 |
|[CISCO] |75 |
|[TÉLÉTRAVAIL] |14 |
|[VPN] |4 |
|[OUTLOOK] |212 |
|[SKYPE, TÉLÉTRAVAIL]|2 |
|[APPSONDEMAND] |1 |
|[WIFI] |5 |
|[CISCO, CISCO] |4 |
|[MOOC] |2 |
|[PANDA, PANDA] |1 |
+--------------------+-----+
Upvotes: 2