rajesh
rajesh

Reputation: 172

Spark column "sub-string" replace when present in other column (row)

In Scala Spark efficiently need to replace {0} from Description column to the value available in States column as shown in the output.
It will be more appreciable if you answer this without using spark udf.

Input : Input DF

Output : Output DF

Upvotes: 1

Views: 1413

Answers (5)

s.polam
s.polam

Reputation: 10362

Without UDF.

Use def regexp_replace(e: org.apache.spark.sql.Column,pattern: org.apache.spark.sql.Column,replacement: org.apache.spark.sql.Column)

scala>  val df = Seq(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur")).toDF("description","states")
df: org.apache.spark.sql.DataFrame = [description: string, states: string]

scala> df.show(false)
+-----------------------------------------------------+--------------+
|description                                          |states        |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India                |Andhra Pradesh|
|The {0} remains the most beutiful state of India     |Maharashtra   |
|This state {0} often termed as 'Switzerland of India'|Manipur       |
+-----------------------------------------------------+--------------+

pattern - lit("\\{0\\}")

scala> df
.withColumn("description",
            regexp_replace(
                      $"description",
                      lit("\\{0\\}"),
                      $"states"
                   )
           )
.show(false)

+---------------------------------------------------------+--------------+
|Description                                              |states        |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India         |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra   |
|This state Manipur often termed as 'Switzerland of India'|Manipur       |
+---------------------------------------------------------+--------------+

Handle null in states column.

scala> df.withColumn("description",when($"states".isNotNull,regexp_replace($"description",lit("\\{0\\}"),$"states")).otherwise($"description")).show(false)
+---------------------------------------------------------+--------------+
|description                                              |states        |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India         |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra   |
|This state Manipur often termed as 'Switzerland of India'|Manipur       |
|Sample Data with null                                    |null          |
+---------------------------------------------------------+--------------+

Upvotes: 1

Som
Som

Reputation: 6323

Try this-

1. Load the test data

    val df = spark.range(1)
      .withColumn("Description", lit("{0} is the 4th biggest"))
      .withColumn("States", lit("Andhra Pradesh"))

    df.show(false)
    df.printSchema()
    /**
      * +---+----------------------+--------------+
      * |id |Description           |States        |
      * +---+----------------------+--------------+
      * |0  |{0} is the 4th biggest|Andhra Pradesh|
      * +---+----------------------+--------------+
      *
      * root
      * |-- id: long (nullable = false)
      * |-- Description: string (nullable = false)
      * |-- States: string (nullable = false)
      */

2. Without UDF

 df.withColumn("Description", expr("case when States is null then Description else replace(Description, '{0}', States) end"))
      .show(false)

    /**
      * +---+---------------------------------+--------------+
      * |id |Description                      |States        |
      * +---+---------------------------------+--------------+
      * |0  |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
      * +---+---------------------------------+--------------+

3. With UDF

    val replace1 = udf((s: String, replace: String) => java.text.MessageFormat.format(s, replace))
    df.withColumn("Description", replace1($"Description", $"States"))
      .show(false)

    /**
      * +---+---------------------------------+--------------+
      * |id |Description                      |States        |
      * +---+---------------------------------+--------------+
      * |0  |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
      * +---+---------------------------------+--------------+
      */

Upvotes: 0

ForeverLearner
ForeverLearner

Reputation: 2103

EDIT 1 : Since you are using a dataframe, you could use:

df.map(each => {
val col1 = each.getString(0)
val col2 = each.getString(1)
val newCol = col1.replace("{0}",col2)
// return the changed value
(newCol, col2)
}).toDF("Description","States").show

If you are having a large dataset, it is better to use an udf

You could also use the RDD approach to map each value to a function. Please see the code below:

scala> val input = List(("{0} is One","1"), ("{0} is Two", "2"))
input: List[(String, String)] = List(({0} is One,1), ({0} is Two,2))

scala> val rdd = sc.parallelize(input)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:26


scala> rdd.collect().foreach(println)
({0} is One,1)                                                                  
({0} is Two,2)

// Replace each {0} with the value in the 2nd column
scala> rdd.map(each => each._1.replace("{0}",each._2)).collect().foreach(println)
1 is One
2 is Two

Upvotes: 0

Chema
Chema

Reputation: 2828

You could follow this approach

input

+-----------------------------------------------------+--------------+
|Description                                          |States        |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India                |Andhra Pradesh|
|The {0} remains the most beutiful state of India     |Maharashtra   |
|This state {0} often termed as 'Switzerland of India'|Manipur       |
+-----------------------------------------------------+--------------+
object SparkColumnSubstring {

  // build the spark session
  val spark = SparkSession
    .builder()
    .appName("SparkColumnSubstring")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","SparkColumnSubstring") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    import spark.implicits._

    val data = List(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur"))

    try {
      // input dataframe
      val dataDF = sc.parallelize(data).toDF("Description","States")
      dataDF.show(truncate = false)

      // transforming the data
      val dataSub = dataDF.map(r => (r(0).toString.replace("{0}", r(1).toString),r(1).toString)).toDF("Description", "States")

      dataSub.show(truncate = false)

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }

expected output

+---------------------------------------------------------+--------------+
|Description                                              |States        |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India         |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra   |
|This state Manipur often termed as 'Switzerland of India'|Manipur       |
+---------------------------------------------------------+--------------+

Hope this helps.

Upvotes: 0

Koenig Lear
Koenig Lear

Reputation: 2436

You can define a UDF by prefixing with udf:

val f = udf((format: String, data: String) => format.replace("{0}", data)

You can replace an existing column value with a new one using df.withColumn

df.withColumn("Description", f (df("Description"), df("States"))

Upvotes: 0

Related Questions