CodeReaper
CodeReaper

Reputation: 387

Spark UDF in Scala for Extracting Relevant Data

I have a Dataframe that has a column which needs some cleaning. I am looking forward for a regex pattern that can be applied in a Spark UDF in Java/Scala which will extract valid content from a String.

Sample input row of column userId as shown in the DataFrame below:

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

Expected Transformation of column named "userId":

A string which looks like:

105286112|115090439|29818926

I need the logic/approach to modify the userId column so as to make a UDF of the same. Can it happen with regex or some other approach?

The input DataFrame looks like this:

+--------------------+--------------------+
|    dt_geo_cat_brand|        userId      |
+--------------------+--------------------+
|2017-10-30_17-18 ...|[[133207500,2017-...|
|2017-10-19_21-22 ...|[[194112773,2017-...|
|2017-10-29_17-18 ...|[[274188233,2017-...|
|2017-10-29_14-16 ...|[[86281353,2017-1...|
|2017-10-01_09-10 ...|[[92478766,2017-1...|
|2017-10-09_17-18 ...|[[156663365,2017-...|
|2017-10-06_17-18 ...|[[111869972,2017-...|
|2017-10-13_09-10 ...|[[64404465,2017-1...|
|2017-10-13_07-08 ...|[[146355663,2017-...|
|2017-10-22_21-22 ...|[[54096488,2017-1...|
+--------------------+--------------------+

Schema:

root
 |-- dt_geo_cat_brand: string (nullable = true)
 |-- userId: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

Desired Output:

+--------------------+--------------------+
|    dt_geo_cat_brand|         userId     |
+--------------------+--------------------+
|2017-10-30_17-18 ...|133207500,1993333444|
|2017-10-19_21-22 ...|122122212,3432323333|
|2017-10-29_17-18 ...|274188233,8869696966|
|2017-10-29_14-16 ...|862813534,444344444,43444343434|
|2017-10-01_09-10 ...|92478766,880342342,4243244432,5554335535|
+--------------------+--------------------+

and so on...

Upvotes: 1

Views: 508

Answers (2)

Shaido
Shaido

Reputation: 28322

You do not need a regex to solve this. The data is formatted as an Array of structs and looking at the schema what you want is the _1 string for each struct. This can be solved with an UDF that extracts the value and then converts everything into a string with mkString("|") to get the expected output:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0)).mkString("|")
})

df.withColumn("userId", extract_id($"userId"))

Addition as per comment #1:

If you want to save the result partitioned on the dt_geo_cat_brand in a csv file (all values on its own row), you can do it as follows. First, return a list from the udf instead of a string and use explode:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0))
})

val df2 = df.withColumn("userId", explode(extract_id($"userId")))

Then use partitionBy(dt_geo_cat_brand) when saving. This will create a folder structure depending on the value in the dt_geo_cat_brand column. Depending on the partitioning, the number of csv files in each folder can differ but they will all have values from a single value in dt_geo_cat_brand (use repartition(1) before saving if you want a single file and have enough memory).

df2.write.partitionBy("dt_geo_cat_brand").csv(baseOutputBucketPath)

Additional as per comment #2:

To not use partitionBy while saving as separate files, you could do as follows (the partitioBy appraoch is recommended). First, find all distinct values in dt_geo_cat_brand:

val vals = df.select("dt_geo_cat_brand").distinct().as[String].collect()

For each of the values, filter the dataframe and save it (use the exploded df2 dataframe here as addition #1):

vals.foreach { v =>
  df2.filter($"dt_geo_cat_brand" === v)
    .select("userId")
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}

Alternativly, do not use the exploded dataframe but split on "|" if that udf is used:

vals.foreach { v =>
  df.filter($"dt_geo_cat_brand" === v)
    .select(split($"userId", "\\|").as("userId"))
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}

Upvotes: 1

Praveen L
Praveen L

Reputation: 987

Write UDF using below regex. It will extract what is needed.

import ss.implicits._

val df = ss.read.csv(path).as("")
df.show()

val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*" // regex which can extract the required data

val input = "[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]"   // input string
val mat = reg.r.findAllIn(input)  // extracting the data

println(mat)
while (mat.hasNext) {
    mat.next()
    println(mat.group(1) + "|" + mat.group(2)+ "|" +  mat.group(3)) // each group will print the 3 extracted fields
}

Output :

105286112|115090439|29818926

With UDF:

import ss.implicits._

    val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*"

    def reg_func = { (s: String) =>
        {
            val mat = reg.r.findAllIn(s)

            println(mat)
            var out = ""
            while (mat.hasNext) {
                mat.next()
                out = mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3)
            }
            out
        }
    }

    val reg_udf = udf(reg_func)

    val df = ss.read.text(path)
    .withColumn("Extracted_fields", reg_udf($"value"))
    df.show(false)

Input : created some sample 2nd record

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

Output:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|value                                                                                                                                                                                       |Extracted_fields            |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286112|115090439|29818926|
|[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286113|115090440|29818927|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+

Upvotes: 1

Related Questions