Mahi
Mahi

Reputation: 403

create schema using struct in spark scala

I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.

I am reading the arrays from json file and used explode method and created a dataframe for each element in column array.

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

obtained output is :

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
 |-- column_id: string (nullable = true)
 |-- data_sensitivty: string (nullable = true)
 |-- datatype: string (nullable = true)
 |-- length: string (nullable = true)
 |-- name: string (nullable = true)

val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name = column_values.select("name", "datatype", "length")

column_name.show(4)


 +------------------+--------+------+
 |              name|datatype|length|
 +------------------+--------+------+
 |     object_number| varchar|   100|
 |     function_type| varchar|   100|
 |             hof_1| decimal|  17,3|
 |             hof_2| decimal|  17,2|
 |            region| varchar|   100|
 |           country| varchar|  null|
 +------------------+--------+------+

Now for all the values listed above i am trying to creating val schema dynamically using below code

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
  (schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
  )

def getFieldType(typeName: String): DataType = typeName match {
    case "varchar" => StringType
    // TODO include other types here
    case _ => StringType
  }

problem with above is that i am able to get the datatypes in struct, but i would also like to get (scale and preicion) only for datatype decimal with a restriction condition that max allowable with a condition that if length for decimal if is null or not present we need to take default value as (10,0) and if value present is greater than 38 we need to take default value as (38,0)

Upvotes: 1

Views: 1250

Answers (2)

Chema
Chema

Reputation: 2828

This approach works fine.

I show you a full example that completes your code and the expected result.

You could introduce more variants into val data.

  /**
    * to obtain a tuple with precision and scale
    * @param precision Option[String]
    * @return (Int, Int)
    */
  def getDecimalScale(precision: Option[String]): (Int, Int) = {
    precision match {
      case Some(pr) => {
        pr.split(",").toList match {
          case List(h, _) if h.toInt >= 38 => (38,0)
          case List(h, t) => (h.toInt,t.head.toString.toInt)
          case _ => (10, 0)
        }
      }
      case None => (10, 0)
    }
  }
    val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
      ("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
      ("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
      ("region", "varchar", "100"), ("country", "varchar", null))

    import spark.implicits._

    val column_name = sc.parallelize(data).toDF("name","datatype","length")

    column_name.show()
/*
+-------------+--------+------+
|         name|datatype|length|
+-------------+--------+------+
|object_number| varchar|   100|
|function_type| varchar|   100|
|        hof_1| decimal|  17,3|
|        hof_2| decimal|  17,2|
|        hof_3| decimal|  null|
|        hof_4| decimal|  39,2|
|       region| varchar|   100|
|      country| varchar|  null|
+-------------+--------+------+
*/

    val schemaColumns = column_name.collect()
    schemaColumns.foreach(println)
/*
[object_number,varchar,100]
[function_type,varchar,100]
[hof_1,decimal,17,3]
[hof_2,decimal,17,2]
[hof_3,decimal,null]
[hof_4,decimal,39,2]
[region,varchar,100]
[country,varchar,null]
*/

    val schema = schemaColumns.foldLeft(new StructType())(
      (schema, columnRow) => {
        columnRow.getAs[String]("datatype") match {
          case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
          case "decimal" => {
            val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
            schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
          }
          case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
        }
      }
    )

    schema.printTreeString()
/*
root
 |-- object_number: string (nullable = true)
 |-- function_type: string (nullable = true)
 |-- hof_1: decimal(17,3) (nullable = true)
 |-- hof_2: decimal(17,2) (nullable = true)
 |-- hof_3: decimal(10,0) (nullable = true)
 |-- hof_4: decimal(38,0) (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
*/

Upvotes: 2

pasha701
pasha701

Reputation: 7207

Decimal datatype with precision can be created as specified here:

 DataTypes.createDecimalType()

In function "getFieldType" case for Decimal type can be added, smth. like:

case "decimal" => DataTypes.createDecimalType(10,0)

Upvotes: 0

Related Questions