Reputation: 403
I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.
I read the arrays from json file and used explode method and created a dataframe for each element in column array.
val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()
Output obtained is:
column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
|-- column_id: string (nullable = true)
|-- data_sensitivty: string (nullable = true)
|-- datatype: string (nullable = true)
|-- length: string (nullable = true)
|-- name: string (nullable = true)
val column_name = column_values.select("name","datatype")
column_name: org.apache.spark.sql.DataFrame = [name: string, datatype: string]
column_name.show(4)
+-----------------+--------+
| name|datatype|
+-----------------+--------+
| object_number| varchar|
| function_type| varchar|
| hof_1| varchar|
| hof_2| varchar|
| region| varchar|
| country| varchar|
+-----------------+--------+
Now for all the values listed above i wanted create a val schema dynamically.
example:
val schema = new StructType()
.add("object_number",StringType,true)
.add("function_type",StringType,true)
.add("hof_1",StringType,true)
.add("hof_2",StringType,true)
.add("region",StringType,true)
.add("Country",StringType,true)
i want to build above struct dynamically once i obtained column dataframe, i read that first i need to create a map of datatype for each element and then create a struct in loop. can some one help here as i have limited knowledge of scala.
Upvotes: 0
Views: 1325
Reputation: 2828
You can follow this approach, it could work fine for your example:
//The schema is encoded in a string
val schemaString = "object_number function_type hof_1 hof_2 region Country"
//Generate the schema based on the string of schema
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//Convert records of the RDD (myRdd) to Rows
val rowRDD = sc.textFile("dir").map(line => line.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2), attributes(3),attributes(4),attributes(5)))
//Apply the schema to the RDD
val perDF = spark.createDataFrame(rowRDD, schema)
Upvotes: 0
Reputation: 7207
DataFrame with fields data can be collected, and for each row field is added to "StructType":
val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
)
def getFieldType(typeName: String): DataType = typeName match {
case "varchar" => StringType
// TODO include other types here
case _ => StringType
}
Upvotes: 2