Reputation: 403
I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.
I am reading the arrays from json file and used explode method and created a dataframe for each element in column array.
val otherPeople ="multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
obtained output is :
column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
|-- column_id: string (nullable = true)
|-- data_sensitivty: string (nullable = true)
|-- datatype: string (nullable = true)
|-- length: string (nullable = true)
|-- name: string (nullable = true)
val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name ="name", "datatype", "length")
| name|datatype|length|
| object_number| varchar| 100|
| function_type| varchar| 100|
| hof_1| decimal| 17,3|
| hof_2| decimal| 17,2|
| region| varchar| 100|
| country| varchar| null|
Now for all the values listed above i am trying to creating val schema dynamically using below code
val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
def getFieldType(typeName: String): DataType = typeName match {
case "varchar" => StringType
// TODO include other types here
case _ => StringType
problem with above is that i am able to get the datatypes in struct, but i would also like to get (scale and preicion) only for datatype decimal with a restriction condition that max allowable with a condition that if length for decimal if is null or not present we need to take default value as (10,0) and if value present is greater than 38 we need to take default value as (38,0)
Upvotes: 1
Views: 1257
Reputation: 2838
This approach works fine.
I show you a full example that completes your code and the expected result.
You could introduce more variants into val data
* to obtain a tuple with precision and scale
* @param precision Option[String]
* @return (Int, Int)
def getDecimalScale(precision: Option[String]): (Int, Int) = {
precision match {
case Some(pr) => {
pr.split(",").toList match {
case List(h, _) if h.toInt >= 38 => (38,0)
case List(h, t) => (h.toInt,t.head.toString.toInt)
case _ => (10, 0)
case None => (10, 0)
val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
("region", "varchar", "100"), ("country", "varchar", null))
import spark.implicits._
val column_name = sc.parallelize(data).toDF("name","datatype","length")
| name|datatype|length|
|object_number| varchar| 100|
|function_type| varchar| 100|
| hof_1| decimal| 17,3|
| hof_2| decimal| 17,2|
| hof_3| decimal| null|
| hof_4| decimal| 39,2|
| region| varchar| 100|
| country| varchar| null|
val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => {
columnRow.getAs[String]("datatype") match {
case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
case "decimal" => {
val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
|-- object_number: string (nullable = true)
|-- function_type: string (nullable = true)
|-- hof_1: decimal(17,3) (nullable = true)
|-- hof_2: decimal(17,2) (nullable = true)
|-- hof_3: decimal(10,0) (nullable = true)
|-- hof_4: decimal(38,0) (nullable = true)
|-- region: string (nullable = true)
|-- country: string (nullable = true)
Upvotes: 2