Ishan Kumar
Ishan Kumar

Reputation: 1982

How to specify schema for CSV file without using Scala case class?

I am loading a CSV file into a DataFrame as below.

val conf=new SparkConf().setAppName("dataframes").setMaster("local")
val sc=new SparkContext(conf)
val spark=SparkSession.builder().getOrCreate()
import spark.implicits._

val df = spark.
  read.  
  format("org.apache.spark.csv").
  option("header", true).
  csv("/home/cloudera/Book1.csv")
scala> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)

How to change age column to be of type Int?

Upvotes: 15

Views: 33298

Answers (3)

vdep
vdep

Reputation: 3590

There is inferSchema option to automatically recognize the type of the variable by:

val df=spark.read
  .format("org.apache.spark.csv")
  .option("header", true)
  .option("inferSchema", true) // <-- HERE
  .csv("/home/cloudera/Book1.csv")

spark-csv originally was an external library by databricks, but included in core spark from spark version 2.0 onwards. You can refer to documentation on the library's github page to find the available options.

Upvotes: 20

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Given val spark=SparkSession.builder().getOrCreate() I guess you're using Spark 2.x.


First of all, please note that Spark 2.x has a native support for CSV format and as such does not require specifying the format by its long name, i.e. org.apache.spark.csv, but just csv.

spark.read.format("csv")...

Since you use csv operator, the CSV format is implied and so you can skip/remove format("csv").

// note that I removed format("csv")
spark.read.option("header", true).csv("/home/cloudera/Book1.csv")

With that you have plenty of options, but I strongly recommend using a case class for...just the schema. See the last solution if you're curious how to do it in Spark 2.0.

cast operator

You could use cast operator.

scala> Seq("1").toDF("str").withColumn("num", 'str cast "int").printSchema
root
 |-- str: string (nullable = true)
 |-- num: integer (nullable = true)

Using StructType

You can also use your own hand-crafted schema with StructType and StructField as follows:

import org.apache.spark.sql.types._    
val schema = StructType(
  StructField("str", StringType, true) :: 
  StructField("num", IntegerType, true) :: Nil)

scala> schema.printTreeString
root
 |-- str: string (nullable = true)
 |-- num: integer (nullable = true)

val q = spark.
  read.
  option("header", true).
  schema(schema).
  csv("numbers.csv")
scala> q.printSchema
root
 |-- str: string (nullable = true)
 |-- num: integer (nullable = true)

Schema DSL

What I found quite interesting lately was so-called Schema DSL. The above schema built using StructType and StructField can be re-written as follows:

import org.apache.spark.sql.types._
val schema = StructType(
  $"str".string ::
  $"num".int :: Nil) 
scala> schema.printTreeString
root
 |-- str: string (nullable = true)
 |-- num: integer (nullable = true)

// or even
val schema = new StructType().
  add($"str".string).
  add($"num".int)
scala> schema.printTreeString
root
 |-- str: string (nullable = true)
 |-- num: integer (nullable = true)

Encoders

Encoders are so easy to use that it's hard to believe you could not want them, even only to build a schema without dealing with StructType, StructField and DataType.

// Define a business object that describes your dataset
case class MyRecord(str: String, num: Int)

// Use Encoders object to create a schema off the business object
import org.apache.spark.sql.Encoders    
val schema = Encoders.product[MyRecord].schema
scala> schema.printTreeString
root
 |-- str: string (nullable = true)
 |-- num: integer (nullable = false)

Upvotes: 31

Shivansh
Shivansh

Reputation: 3544

What you can do is use a UDF in this case :

Step 1: Make a udf that converts String to Int.

val stringToIntUDF = udf((value:String)=>value.toInt)

Step 2 : Apply this UDF to the Column that you want to convert !

val updatedDF = df.withColumns("age",stringToIntUDF(df("age")))
updatedDF.printSchema

This should give you your desired result !

If you just want to infer your schema from the CSV file. Then @vdep solution seems to be doing the right thing !

val df=spark.read
  .format("org.apache.spark.csv")
  .option("header",true)
  .option("inferSchema", "true") // <-- HERE
  .csv("/home/cloudera/Book1.csv")

Upvotes: -2

Related Questions