Reputation: 131
I'm new to Scala and wonder what would be the best methods to validate CSV file preferably using map function and adding new column depending if the conditions were met. I want to put this as UDF function for my data frame in Apache Spark.
Here is the schema:
Record Type val1 val2 val3
TYPE1 1 2 ZZ
TYPE2 2 555 KK
And JSON definition I want to validate against:
"rows" :
{
"TYPE1" :
"fields" : [
{
"required" : "true",
"regex": "TYPE1",
},
{
"required" : true",
"regex" :"[a-zA-Z]{2}[a-zA-Z]{2}",
"allowed_values": null
},
{
"required" : true",
"regex" :"[a-zA-Z]{2}[a-zA-Z]{2}",
"allowed_values" : ["ZZ","KK"]
}
]
}
Upvotes: 0
Views: 807
Reputation: 2821
I'm not sure about your JSON definition (it's also missing some quotes and curly braces), and whether Record Type is a column in the CSV, but here's a simplification -- you can add "Record Type" logic around it if needed.
Assuming a file validator.json:
{
"fields" : [
{
"name" : "val1",
"regex": "[0-9]+"
},{
"name" : "val2",
"regex" :"[0-9]+"
},{
"name" : "val3",
"regex" :"[A-Z]{2}"
}
]
}
Generally, by default (without extra options regarding the schema) spark.read.format("csv").option("header", "true").load("file.csv")
will use Strings for all of the columns in your file. Here, it is assumed you have a header val1,val2,val3
, as the first line of your CSV. An equivalently defined DF inline:
val df = Seq(("1", "2", "ZZ"), ("2", "555", "KK")).toDF("val1", "val2", "val3")
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.databind.ObjectMapper
import scala.io.Source
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
// read the validator as one long string
val jsonString = Source.fromFile("validator.json").getLines.mkString("")
// map the json string into an object (nested map)
val regexMap:Map[String,Seq[Map[String,String]]] = mapper.readValue(jsonString, classOf[Map[String, Seq[Map[String, String]]]])
//val1 rlike '[0-9]+' AND val2 rlike '[0-9]+' AND val3 rlike '[A-Z]{2}'
val exprStr:String = regexMap("fields").map((fieldDef:Map[String, String]) => s"${fieldDef("name")} rlike '${fieldDef("regex")}'").mkString(" AND ")
// this asks whether all rows match
val matchingRowCount:Long = df.filter(expr("val1 rlike '[0-9]+' AND val2 rlike '[0-9]+' AND val3 rlike '[A-Z][A-Z]'")).count
// if the counts match, then all of the rows follow the rules
df.count == matchingRowCount
// this adds a column about whether the row matches
df.withColumn("matches",expr(exprStr)).show
result:
+----+----+----+-------+
|val1|val2|val3|matches|
+----+----+----+-------+
| 1| 2| ZZ| true|
| 2| 555| KK| true|
+----+----+----+-------+
Upvotes: 1