Reputation: 387
I have a Dataframe that has a column which needs some cleaning. I am looking forward for a regex pattern that can be applied in a Spark UDF in Java/Scala which will extract valid content from a String.
Sample input row of column userId
as shown in the DataFrame below:
[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
Expected Transformation of column named "userId":
A string which looks like:
105286112|115090439|29818926
I need the logic/approach to modify the userId
column so as to make a UDF of the same. Can it happen with regex or some other approach?
The input DataFrame looks like this:
+--------------------+--------------------+
| dt_geo_cat_brand| userId |
+--------------------+--------------------+
|2017-10-30_17-18 ...|[[133207500,2017-...|
|2017-10-19_21-22 ...|[[194112773,2017-...|
|2017-10-29_17-18 ...|[[274188233,2017-...|
|2017-10-29_14-16 ...|[[86281353,2017-1...|
|2017-10-01_09-10 ...|[[92478766,2017-1...|
|2017-10-09_17-18 ...|[[156663365,2017-...|
|2017-10-06_17-18 ...|[[111869972,2017-...|
|2017-10-13_09-10 ...|[[64404465,2017-1...|
|2017-10-13_07-08 ...|[[146355663,2017-...|
|2017-10-22_21-22 ...|[[54096488,2017-1...|
+--------------------+--------------------+
Schema:
root
|-- dt_geo_cat_brand: string (nullable = true)
|-- userId: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)
Desired Output:
+--------------------+--------------------+
| dt_geo_cat_brand| userId |
+--------------------+--------------------+
|2017-10-30_17-18 ...|133207500,1993333444|
|2017-10-19_21-22 ...|122122212,3432323333|
|2017-10-29_17-18 ...|274188233,8869696966|
|2017-10-29_14-16 ...|862813534,444344444,43444343434|
|2017-10-01_09-10 ...|92478766,880342342,4243244432,5554335535|
+--------------------+--------------------+
and so on...
Upvotes: 1
Views: 508
Reputation: 28322
You do not need a regex to solve this. The data is formatted as an Array of structs and looking at the schema what you want is the _1
string for each struct. This can be solved with an UDF that extracts the value and then converts everything into a string with mkString("|")
to get the expected output:
val extract_id = udf((arr: Seq[Row]) => {
arr.map(_.getAs[String](0)).mkString("|")
})
df.withColumn("userId", extract_id($"userId"))
Addition as per comment #1:
If you want to save the result partitioned on the dt_geo_cat_brand
in a csv file (all values on its own row), you can do it as follows. First, return a list from the udf instead of a string and use explode
:
val extract_id = udf((arr: Seq[Row]) => {
arr.map(_.getAs[String](0))
})
val df2 = df.withColumn("userId", explode(extract_id($"userId")))
Then use partitionBy(dt_geo_cat_brand)
when saving. This will create a folder structure depending on the value in the dt_geo_cat_brand
column. Depending on the partitioning, the number of csv files in each folder can differ but they will all have values from a single value in dt_geo_cat_brand
(use repartition(1)
before saving if you want a single file and have enough memory).
df2.write.partitionBy("dt_geo_cat_brand").csv(baseOutputBucketPath)
Additional as per comment #2:
To not use partitionBy
while saving as separate files, you could do as follows (the partitioBy
appraoch is recommended). First, find all distinct values in dt_geo_cat_brand
:
val vals = df.select("dt_geo_cat_brand").distinct().as[String].collect()
For each of the values, filter the dataframe and save it (use the exploded df2
dataframe here as addition #1):
vals.foreach { v =>
df2.filter($"dt_geo_cat_brand" === v)
.select("userId")
.write
.csv(s"$baseOutputBucketPath=$v/")})
}
Alternativly, do not use the exploded dataframe but split on "|"
if that udf is used:
vals.foreach { v =>
df.filter($"dt_geo_cat_brand" === v)
.select(split($"userId", "\\|").as("userId"))
.write
.csv(s"$baseOutputBucketPath=$v/")})
}
Upvotes: 1
Reputation: 987
Write UDF using below regex. It will extract what is needed.
import ss.implicits._
val df = ss.read.csv(path).as("")
df.show()
val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*" // regex which can extract the required data
val input = "[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]" // input string
val mat = reg.r.findAllIn(input) // extracting the data
println(mat)
while (mat.hasNext) {
mat.next()
println(mat.group(1) + "|" + mat.group(2)+ "|" + mat.group(3)) // each group will print the 3 extracted fields
}
Output :
105286112|115090439|29818926
With UDF:
import ss.implicits._
val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*"
def reg_func = { (s: String) =>
{
val mat = reg.r.findAllIn(s)
println(mat)
var out = ""
while (mat.hasNext) {
mat.next()
out = mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3)
}
out
}
}
val reg_udf = udf(reg_func)
val df = ss.read.text(path)
.withColumn("Extracted_fields", reg_udf($"value"))
df.show(false)
Input : created some sample 2nd record
[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
Output:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|value |Extracted_fields |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286112|115090439|29818926|
|[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286113|115090440|29818927|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
Upvotes: 1