AJm
AJm

Reputation: 1023

Spark - creating schema programmatically with different data types

I have a dataset consisting of 7-8 fields which are of type String, Int & Float.

Am trying to create Schema by Programmatic approach by using this :

val schema = StructType(header.split(",").map(column => StructField(column, StringType, true)))

And Then mapping it to Row type like :

val dataRdd = datafile.filter(x => x!=header).map(x => x.split(",")).map(col => Row(col(0).trim, col(1).toInt, col(2).toFloat, col(3), col(4) ,col(5), col(6), col(7), col(8)))

But after creating DataFrame when i use DF.show() it gives error for the Integer field.

So how to create such schema where we have multiple data type in the dataset

Upvotes: 3

Views: 6897

Answers (2)

AJm
AJm

Reputation: 1023

Defining the Structure Type first :

val schema1 = StructType(Array(
  StructField("AcutionId", StringType, true),
  StructField("Bid", IntegerType, false),
  StructField("BidTime", FloatType, false),
  StructField("Bidder", StringType, true),
  StructField("BidderRate", FloatType, false),
  StructField("OpenBid", FloatType, false),
  StructField("Price", FloatType, false),
  StructField("Item", StringType, true),
  StructField("DaystoLive", IntegerType, false)
))

Then specifying each column that is going to b present inside a Row by converting it to specific types:

val dataRdd = datafile.filter(x => x!=header).map(x => x.split(","))
  .map(col => Row(
    col(0).trim,
    col(1).trim.toInt,
    col(2).trim.toFloat,
    col(3).trim,
    col(4).trim.toFloat,
    col(5).trim.toFloat,
    col(6).trim.toFloat,
    col(7).trim,
    col(8).trim.toInt)
  )

Then applying the Schema to the RDD

val auctionDF = spark.sqlContext.createDataFrame(dataRdd,schema1)

Upvotes: 2

elghoto
elghoto

Reputation: 303

The problem you have in your code is that you are assigning all the fields as StringType.

Assuming that in the header you have only the name of the fields, then you can't guess the type.

Let's assume that the header string is like this

val header = "field1:Int,field2:Double,field3:String"

Then the code should be

def inferType(field: String) = field.split(":")(1) match {
   case "Int" => IntegerType
   case "Double" => DoubleType
   case "String" => StringType
   case _ => StringType
}

val schema = StructType(header.split(",").map(column => StructField(column, inferType(column), true)))

For the header string example you get

root
 |-- field1:Int: integer (nullable = true)
 |-- field2:Double: double (nullable = true)
 |-- field3:String: string (nullable = true)

On the other hand. If what you need it's a data frame from text, I would suggest that you create the DataFrame directly from the file itself. It's pointless to create it from an RDD.

val fileReader = spark.read.format("com.databricks.spark.csv")
  .option("mode", "DROPMALFORMED")
  .option("header", "true")
  .option("inferschema", "true")
  .option("delimiter", ",")

val df = fileReader.load(PATH_TO_FILE)

Upvotes: 5

Related Questions