Shibu
Shibu

Reputation: 1542

Regex match with dataframe column values

I want to perform a lookup between a Map[String,List[scala.util.matching.Regex]] with a dataframe column . if any of the List[scala.util.matching.Regex] matches with the dataframe column values then it should return the key from Map[String,List[scala.util.matching.Regex]]

Map[String,List[scala.util.matching.Regex]] = Map(m1 -> List(rule1, rule2), m2 -> List(rule3), m3 -> List(rule6)))

I want to iterate through the list of regex and match with the dataframe column value. it would be better if the regex match can be done in parallel rather than sequential

dataframe


+------------------------+
|desc                    |
+------------------------+
|STRING MATCHES SSS rule1|
|STRING MATCHES SSS rule1|
|STRING MATCHES SSS rule1|
|STRING MATCHES SSS rule2|
|STRING MATCHES SSS rule2|
|STRING MATCHES SSS rule3|
|STRING MATCHES SSS rule3|
|STRING MATCHES SSS rule6|
+------------------------+

O/P:

+-------------------+------------------------+
|merchant           |desc                    |
+-------------------+------------------------+
|m1                 |STRING MATCHES SSS rule1|
|m1                 |STRING MATCHES SSS rule1|
|m1                 |STRING MATCHES SSS rule1|
|m1                 |STRING MATCHES SSS rule2|
|m1                 |STRING MATCHES SSS rule2|
|m2                 |STRING MATCHES SSS rule3|
|m2                 |STRING MATCHES SSS rule3|
|m3                 |STRING MATCHES SSS rule6|
+-------------------+------------------------+

Upvotes: 0

Views: 1243

Answers (2)

abiratsis
abiratsis

Reputation: 7316

Here is another way based on DataFrame map function and a predefined rule set rules:

import spark.implicits._
import scala.util.matching.Regex

val df = Seq(
("STRING MATCHES SSS rule1"),
("STRING MATCHES SSS rule1"),
("STRING MATCHES SSS rule1"),
("STRING MATCHES SSS rule2"),
("STRING MATCHES SSS rule2"),
("STRING MATCHES SSS rule3"),
("STRING MATCHES SSS rule3"),
("STRING MATCHES SSS rule6"),
("STRING MATCHES SSS ruleXXX")
).toDF("desc")

val rules = Map(
  "m1" -> List("rule1".r, "rule2".r), 
  "m2" -> List("rule3".r), 
  "m3" -> List("rule6".r)
)

df.map{r =>
  val desc = r.getString(0)
  val merchant = rules.find(_._2.exists(_.findFirstIn(desc).isDefined)) match {
      case Some((m : String, _)) => m
      case None => null
    }

  (merchant, desc)
}.toDF("merchant", "desc").show(false)

Output:

+--------+--------------------------+
|merchant|desc                      |
+--------+--------------------------+
|m1      |STRING MATCHES SSS rule1  |
|m1      |STRING MATCHES SSS rule1  |
|m1      |STRING MATCHES SSS rule1  |
|m1      |STRING MATCHES SSS rule2  |
|m1      |STRING MATCHES SSS rule2  |
|m2      |STRING MATCHES SSS rule3  |
|m2      |STRING MATCHES SSS rule3  |
|m3      |STRING MATCHES SSS rule6  |
|null    |STRING MATCHES SSS ruleXXX|
+--------+--------------------------+

Explanation:

  • rules.find(... from rules find the key/value pair

  • _._2.exists(... which has value a regex

  • _.findFirstIn(desc).isDefined that matches with desc

  • case Some((m : String, _)) => m and extract the key from that pair

PS: I am not sure what you mean with the regex match can be done in parallel rather than sequential since the map function in the above solution it will be already executed in parallel. The level of parallelisation depends on the selected partition number. To add additional parallelisation inside the map function e.g in the form of a thread (or Scala Futures) it would definitely complicate the code without increasing the performance. That is because if you create a large number of threads it is more likely to create a bottleneck for the CPU instead of speeding up your program. Spark is a effective distributed system and there is no need to find out alternatives regarding parallel execution.

Upvotes: 1

Nikhil Suthar
Nikhil Suthar

Reputation: 2431

You can declare a UDF like below which will run in parallel and will be fast. Below is just a reference, as per I understood your problem. You can take this as a reference and can design your UDF accordingly.

scala> import org.apache.spark.sql.expressions.UserDefinedFunction

scala> def RuleCheck:UserDefinedFunction = udf((colmn:String) => {
     |  val Rule:Map[String,List[String]] = Map("Number" -> List("[0-9]"),"Statment" -> List("[a-zA-Z]"), "Fruit" -> List("apple","banana","orange"), "Country" -> List("India","US","UK"))
     | var Out = scala.collection.mutable.Set[String]()
     | Rule.foreach{ rr =>
     | val key = rr._1
     | val Listrgx = rr._2
     | 
     | Listrgx.foreach{ x =>
     | val rgx = x.r
     | 
     | if(rgx.findFirstMatchIn(colmn).mkString != ""){
     | Out += key
     | }
     |         }
     |       }
     |       Out.mkString(",") })

scala> df.show()
+---+--------------------+
| id|             comment|
+---+--------------------+
|  1|     I have 3 apples|
|  2|I like banana and...|
|  3|        I am from US|
|  4|          1932409243|
|  5|       I like orange|
|  6|         #%@#$@#%@#$|
+---+--------------------+


scala> df.withColumn("Key", RuleCheck(col("comment"))).show(false)
+---+---------------------------------+----------------------+
|id |comment                          |Key                   |
+---+---------------------------------+----------------------+
|1  |I have 3 apples                  |Number,Fruit,Statment |
|2  |I like banana and I am from India|Country,Fruit,Statment|
|3  |I am from US                     |Country,Statment      |
|4  |1932409243                       |Number                |
|5  |I like orange                    |Fruit,Statment        |
|6  |#%@#$@#%@#$                      |                      |
+---+---------------------------------+----------------------+

Upvotes: 0

Related Questions