Ripley Rip
Ripley Rip

Reputation: 131

validation of a csv against JSON definition

I'm new to Scala and wonder what would be the best methods to validate CSV file preferably using map function and adding new column depending if the conditions were met. I want to put this as UDF function for my data frame in Apache Spark.

Here is the schema:

  Record Type  val1  val2  val3  

   TYPE1         1    2    ZZ 
   TYPE2         2   555   KK

And JSON definition I want to validate against:

"rows" :
{ 
  "TYPE1" :

   "fields" : [
    {
       "required" : "true",
       "regex":  "TYPE1",
    },
     {
        "required" : true",
         "regex" :"[a-zA-Z]{2}[a-zA-Z]{2}",
       "allowed_values": null  

     },
       {
        "required" : true",
         "regex" :"[a-zA-Z]{2}[a-zA-Z]{2}",
          "allowed_values" : ["ZZ","KK"]
     }
  ]
}

Upvotes: 0

Views: 807

Answers (1)

ELinda
ELinda

Reputation: 2821

I'm not sure about your JSON definition (it's also missing some quotes and curly braces), and whether Record Type is a column in the CSV, but here's a simplification -- you can add "Record Type" logic around it if needed.

Assuming a file validator.json:

{
  "fields" : [
    {
      "name" : "val1",
      "regex": "[0-9]+"
    },{
      "name" : "val2",
      "regex" :"[0-9]+"
    },{
      "name" : "val3",
      "regex" :"[A-Z]{2}"
    }
  ]
}

Generally, by default (without extra options regarding the schema) spark.read.format("csv").option("header", "true").load("file.csv") will use Strings for all of the columns in your file. Here, it is assumed you have a header val1,val2,val3, as the first line of your CSV. An equivalently defined DF inline:

val df = Seq(("1", "2", "ZZ"), ("2", "555", "KK")).toDF("val1", "val2", "val3")
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.databind.ObjectMapper
import scala.io.Source

val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
// read the validator as one long string
val jsonString = Source.fromFile("validator.json").getLines.mkString("")

// map the json string into an object (nested map)
val regexMap:Map[String,Seq[Map[String,String]]] = mapper.readValue(jsonString, classOf[Map[String, Seq[Map[String, String]]]])

//val1 rlike '[0-9]+' AND val2 rlike '[0-9]+' AND val3 rlike '[A-Z]{2}'
val exprStr:String = regexMap("fields").map((fieldDef:Map[String, String]) => s"${fieldDef("name")} rlike '${fieldDef("regex")}'").mkString(" AND ")

// this asks whether all rows match
val matchingRowCount:Long = df.filter(expr("val1 rlike '[0-9]+' AND val2 rlike '[0-9]+' AND val3 rlike '[A-Z][A-Z]'")).count

// if the counts match, then all of the rows follow the rules
df.count == matchingRowCount

// this adds a column about whether the row matches
df.withColumn("matches",expr(exprStr)).show

result:

+----+----+----+-------+
|val1|val2|val3|matches|
+----+----+----+-------+
|   1|   2|  ZZ|   true|
|   2| 555|  KK|   true|
+----+----+----+-------+

Upvotes: 1

Related Questions