Reputation: 172
In Scala Spark efficiently need to replace {0} from Description column to the value available in States column as shown in the output.
It will be more appreciable if you answer this without using spark udf.
Input : Input DF
Output : Output DF
Upvotes: 1
Views: 1413
Reputation: 10362
Without UDF.
Use def regexp_replace(e: org.apache.spark.sql.Column,pattern: org.apache.spark.sql.Column,replacement: org.apache.spark.sql.Column)
scala> val df = Seq(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur")).toDF("description","states")
df: org.apache.spark.sql.DataFrame = [description: string, states: string]
scala> df.show(false)
+-----------------------------------------------------+--------------+
|description |states |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India |Andhra Pradesh|
|The {0} remains the most beutiful state of India |Maharashtra |
|This state {0} often termed as 'Switzerland of India'|Manipur |
+-----------------------------------------------------+--------------+
pattern - lit("\\{0\\}")
scala> df
.withColumn("description",
regexp_replace(
$"description",
lit("\\{0\\}"),
$"states"
)
)
.show(false)
+---------------------------------------------------------+--------------+
|Description |states |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra |
|This state Manipur often termed as 'Switzerland of India'|Manipur |
+---------------------------------------------------------+--------------+
Handle null
in states
column.
scala> df.withColumn("description",when($"states".isNotNull,regexp_replace($"description",lit("\\{0\\}"),$"states")).otherwise($"description")).show(false)
+---------------------------------------------------------+--------------+
|description |states |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra |
|This state Manipur often termed as 'Switzerland of India'|Manipur |
|Sample Data with null |null |
+---------------------------------------------------------+--------------+
Upvotes: 1
Reputation: 6323
Try this-
val df = spark.range(1)
.withColumn("Description", lit("{0} is the 4th biggest"))
.withColumn("States", lit("Andhra Pradesh"))
df.show(false)
df.printSchema()
/**
* +---+----------------------+--------------+
* |id |Description |States |
* +---+----------------------+--------------+
* |0 |{0} is the 4th biggest|Andhra Pradesh|
* +---+----------------------+--------------+
*
* root
* |-- id: long (nullable = false)
* |-- Description: string (nullable = false)
* |-- States: string (nullable = false)
*/
df.withColumn("Description", expr("case when States is null then Description else replace(Description, '{0}', States) end"))
.show(false)
/**
* +---+---------------------------------+--------------+
* |id |Description |States |
* +---+---------------------------------+--------------+
* |0 |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
* +---+---------------------------------+--------------+
val replace1 = udf((s: String, replace: String) => java.text.MessageFormat.format(s, replace))
df.withColumn("Description", replace1($"Description", $"States"))
.show(false)
/**
* +---+---------------------------------+--------------+
* |id |Description |States |
* +---+---------------------------------+--------------+
* |0 |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
* +---+---------------------------------+--------------+
*/
Upvotes: 0
Reputation: 2103
EDIT 1 : Since you are using a dataframe, you could use:
df.map(each => {
val col1 = each.getString(0)
val col2 = each.getString(1)
val newCol = col1.replace("{0}",col2)
// return the changed value
(newCol, col2)
}).toDF("Description","States").show
If you are having a large dataset, it is better to use an udf
You could also use the RDD approach to map
each value to a function. Please see the code below:
scala> val input = List(("{0} is One","1"), ("{0} is Two", "2"))
input: List[(String, String)] = List(({0} is One,1), ({0} is Two,2))
scala> val rdd = sc.parallelize(input)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.collect().foreach(println)
({0} is One,1)
({0} is Two,2)
// Replace each {0} with the value in the 2nd column
scala> rdd.map(each => each._1.replace("{0}",each._2)).collect().foreach(println)
1 is One
2 is Two
Upvotes: 0
Reputation: 2828
You could follow this approach
input
+-----------------------------------------------------+--------------+
|Description |States |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India |Andhra Pradesh|
|The {0} remains the most beutiful state of India |Maharashtra |
|This state {0} often termed as 'Switzerland of India'|Manipur |
+-----------------------------------------------------+--------------+
object SparkColumnSubstring {
// build the spark session
val spark = SparkSession
.builder()
.appName("SparkColumnSubstring")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","SparkColumnSubstring") // To silence Metrics warning
.getOrCreate()
val sqlContext = spark.sqlContext
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
import spark.implicits._
val data = List(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur"))
try {
// input dataframe
val dataDF = sc.parallelize(data).toDF("Description","States")
dataDF.show(truncate = false)
// transforming the data
val dataSub = dataDF.map(r => (r(0).toString.replace("{0}", r(1).toString),r(1).toString)).toDF("Description", "States")
dataSub.show(truncate = false)
// To have the opportunity to view the web console of Spark: http://localhost:4040/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
expected output
+---------------------------------------------------------+--------------+
|Description |States |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra |
|This state Manipur often termed as 'Switzerland of India'|Manipur |
+---------------------------------------------------------+--------------+
Hope this helps.
Upvotes: 0
Reputation: 2436
You can define a UDF by prefixing with udf:
val f = udf((format: String, data: String) => format.replace("{0}", data)
You can replace an existing column value with a new one using df.withColumn
df.withColumn("Description", f (df("Description"), df("States"))
Upvotes: 0