Reputation: 2085
I have a dataframe yeadDF, created by reading an RDBMS table as below:
val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable", s"(${query}) as year2017")
.option("user", devUserName)
.option("password", devPassword)
.option("numPartitions",15)
.load()
I have to apply a regex pattern to the above dataframe before ingesting it into Hive table on HDFS. Below is the regex pattern:
regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' )
I should be applying this regex only on the columns that are of datatype String in the dataframe: yearDF
. I tried the following way:
val regExpr = yearDF.schema.fields
.map(x =>
if(x.dataType == String)
"regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x,x)
)
yearDF.selectExpr(regExpr:_*)
But it gives me a compilation error: Type mismatch, expected: Seq[String], actual: Array[Any]
I cannot use yearDF.columns.map
as this will act on all the columns and
I am unable to properly form the logic here.
Could anyone let me know how can I apply the regex mentioned above on the dataframe:yearDF
only on the columns that are of String type ?
Upvotes: 0
Views: 2421
Reputation: 1
I also faced similar issues while applying regex_replace() to only strings columns of a dataframe. The trick is to make regEx pattern (in my case "pattern") that resolves inside the double quotes and also apply escape characters.
val pattern="\"\\\\\\\\[\\\\\\\\x{FFFD}\\\\\\\\x{0}\\\\\\\\x{1F}\\\\\\\\x{81}\\\\\\\\x{8D}\\\\\\\\x{8F}\\\\\\\\x{90}\\\\\\\\x{9D}\\\\\\\\x{A0}\\\\\\\\x{0380}\\\\\\\\]\""
val regExpr = parq_out_df_temp.schema.fields.map(x =>
if(x.dataType == StringType){s"regexp_replace(%s, $pattern,'') as %s".format(x.name,x.name)}
else x.name)
val parq_out=parq_out_df_temp.selectExpr(regExpr:_*)
This worked fine for me!!
Upvotes: 0
Reputation: 16076
It's because yearDF.selectExpr(regExpr:_*)
expects regExpr
to be a Seq of String, while your regExpr is Array[Any]. Ok, that you see in the message. But why it's Array[Any]?
Look at your map function. for each field in schema, you are mapping: - each column with StringType to expression with regular expression - other cases -> None.
Btw., use org.apache.spark.sql.types.StringType
, String
.
So, instead, write:
val regExpr = yearDF.schema.fields
.map(x =>
if (x.dataType == StringType)
"regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x.name, x.name)
else x.name
)
yearDF.selectExpr(regExpr:_*)
Upvotes: 1