Metadata
Metadata

Reputation: 2085

How to apply Regex pattern on a Dataframe's String columns in scala?

I have a dataframe yeadDF, created by reading an RDBMS table as below:

val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
                                                .option("dbtable", s"(${query}) as year2017")
                                                .option("user", devUserName)
                                                .option("password", devPassword)
                                                .option("numPartitions",15)
                                                .load()

I have to apply a regex pattern to the above dataframe before ingesting it into Hive table on HDFS. Below is the regex pattern:

regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' )

I should be applying this regex only on the columns that are of datatype String in the dataframe: yearDF. I tried the following way:

val regExpr = yearDF.schema.fields
    .map(x => 
        if(x.dataType == String)
             "regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x,x)
     )
yearDF.selectExpr(regExpr:_*)

But it gives me a compilation error: Type mismatch, expected: Seq[String], actual: Array[Any]

I cannot use yearDF.columns.map as this will act on all the columns and I am unable to properly form the logic here. Could anyone let me know how can I apply the regex mentioned above on the dataframe:yearDF only on the columns that are of String type ?

Upvotes: 0

Views: 2421

Answers (2)

Ashish Jain
Ashish Jain

Reputation: 1

I also faced similar issues while applying regex_replace() to only strings columns of a dataframe. The trick is to make regEx pattern (in my case "pattern") that resolves inside the double quotes and also apply escape characters.

val pattern="\"\\\\\\\\[\\\\\\\\x{FFFD}\\\\\\\\x{0}\\\\\\\\x{1F}\\\\\\\\x{81}\\\\\\\\x{8D}\\\\\\\\x{8F}\\\\\\\\x{90}\\\\\\\\x{9D}\\\\\\\\x{A0}\\\\\\\\x{0380}\\\\\\\\]\""

val regExpr = parq_out_df_temp.schema.fields.map(x => 
if(x.dataType == StringType){s"regexp_replace(%s, $pattern,'') as %s".format(x.name,x.name)} 
else x.name)

val parq_out=parq_out_df_temp.selectExpr(regExpr:_*)

This worked fine for me!!

Upvotes: 0

T. Gawęda
T. Gawęda

Reputation: 16076

It's because yearDF.selectExpr(regExpr:_*) expects regExpr to be a Seq of String, while your regExpr is Array[Any]. Ok, that you see in the message. But why it's Array[Any]?

Look at your map function. for each field in schema, you are mapping: - each column with StringType to expression with regular expression - other cases -> None.

Btw., use org.apache.spark.sql.types.StringType, String.

So, instead, write:

val regExpr = yearDF.schema.fields
    .map(x => 
        if (x.dataType == StringType) 
              "regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x.name, x.name)
        else x.name
     )
yearDF.selectExpr(regExpr:_*)

Upvotes: 1

Related Questions