Reputation: 115
I have a dataframe containing two columns,one is data and the other column is character count in that data field.
Data Count
Hello 5
How 3
World 5
I want to change value of column data based on the value in count column. How can this be achieved? I tried this using an udf :
invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("value"),invalidrecords("a_cnt")))
This seems to fail, is this the correct way to do it?
Upvotes: 5
Views: 21805
Reputation: 41957
Here's an easy way of doing it
first you create a dataframe
import sqlContext.implicits._
val invalidrecords = Seq(
("Hello", 5),
("How", 3),
("World", 5)
).toDF("Data", "Count")
you should have
+-----+-----+
|Data |Count|
+-----+-----+
|Hello|5 |
|How |3 |
|World|5 |
+-----+-----+
Then you define udf function as
import org.apache.spark.sql.functions._
def appendDelimiterError = udf((data: String, count: Int) => "value with error" )
And you call using withColumn
as
invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)
You should have output as
+-----+-----+----------------+
|Data |Count|value |
+-----+-----+----------------+
|Hello|5 |value with error|
|How |3 |value with error|
|World|5 |value with error|
+-----+-----+----------------+
You can write your logic instead of returning a string from udf
function
Edited
Answering your requirements in the comment below would require you to change the udf function and withColumn as below
def appendDelimiterError = udf((data: String, count: Int) => {
if(count < 5) s"convert value to ${data} - error"
else data
} )
invalidrecords.withColumn("Data",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)
you should have output as
+----------------------------+-----+
|Data |Count|
+----------------------------+-----+
|Hello |5 |
|convert value to How - error|3 |
|World |5 |
+----------------------------+-----+
Upvotes: 15