Alg_D
Alg_D

Reputation: 2390

Spark - remove special characters from rows Dataframe with different column types

Assuming I've a Dataframe with many columns, some are type string others type int and others type map.

e.g. field/columns types: stringType|intType|mapType<string,int>|...

|--------------------------------------------------------------------------
|  myString1      |myInt1|  myMap1                                              |...
|--------------------------------------------------------------------------
|"this_is_#string"| 123 |{"str11_in#map":1,"str21_in#map":2, "str31_in#map": 31}|...
|"this_is_#string"| 456 |{"str12_in#map":1,"str22_in#map":2, "str32_in#map": 32}|...
|"this_is_#string"| 789 |{"str13_in#map":1,"str23_in#map":2, "str33_in#map": 33}|...
|--------------------------------------------------------------------------

I want to remove some characters like '_' and '#' from all columns of String and Map type so the result Dataframe/RDD will be:

|------------------------------------------------------------------------
|myString1     |myInt1|     myMap1|...                                 |
|------------------------------------------------------------------------
|"thisisstring"| 123 |{"str11inmap":1,"str21inmap":2, "str31inmap": 31}|...
|"thisisstring"| 456 |{"str12inmap":1,"str22inmap":2, "str32inmap": 32}|...
|"thisisstring"| 789 |{"str13inmap":1,"str23inmap":2, "str33inmap": 33}|...
|-------------------------------------------------------------------------

I am not sure if it's better to convert the Dataframe into an RDD and work with it or perform the work in the Dataframe.

Also, not sure how to handle the regexp with different column types in the best way (I am sing scala). And I would like to perform this action for all column of these two types (string and map), trying to avoid using the column names like:

def cleanRows(mytabledata: DataFrame): RDD[String] = {

//this will do the work for a specific column (myString1) of type string
val oneColumn_clean = mytabledata.withColumn("myString1", regexp_replace(col("myString1"),"[_#]",""))

       ...
//return type can be RDD or Dataframe...
}

Is there any simple solution to perform this? Thanks

Upvotes: 1

Views: 17375

Answers (1)

akuiper
akuiper

Reputation: 214927

One option is to define two udfs to handle string type column and Map type column separately:

import org.apache.spark.sql.functions.udf
val df = Seq(("this_is#string", 3, Map("str1_in#map" -> 3))).toDF("myString", "myInt", "myMap")
df.show
+--------------+-----+--------------------+
|      myString|myInt|               myMap|
+--------------+-----+--------------------+
|this_is#string|    3|Map(str1_in#map -...|
+--------------+-----+--------------------+

1) Udf to handle string type columns:

def remove_string: String => String = _.replaceAll("[_#]", "")
def remove_string_udf = udf(remove_string)

2) Udf to handle Map type columns:

def remove_map: Map[String, Int] => Map[String, Int] = _.map{ case (k, v) => k.replaceAll("[_#]", "") -> v }
def remove_map_udf = udf(remove_map)

3) Apply udfs to corresponding columns to clean it up:

df.withColumn("myString", remove_string_udf($"myString")).
   withColumn("myMap", remove_map_udf($"myMap")).show

+------------+-----+-------------------+
|    myString|myInt|              myMap|
+------------+-----+-------------------+
|thisisstring|    3|Map(str1inmap -> 3)|
+------------+-----+-------------------+

Upvotes: 7

Related Questions