Reputation: 199
I am using Spark 1.6.1 with Scala 2.10.5 built in. I am examining some weather data for which sometimes I have decimal values. Here is the code:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
val rawData=sc.textFile("Example_Weather.csv").map(_.split(","))
val header=rawData.first
val rawDataNoHeader=rawData.filter(_(0)!= header(0))
rawDataNoHeader.first
object schema {
val weatherdata= StructType(Seq(
StructField("date", StringType, true),
StructField("Region", StringType, true),
StructField("Temperature", DecimalType(32,16), true),
StructField("Solar", IntegerType, true),
StructField("Rainfall", DecimalType(32,16), true),
StructField("WindSpeed", DecimalType(32,16), true))
)
}
val dataDF=sqlContext.createDataFrame(rawDataNoHeader.map(p=>Row(p(0),p(1),p(2),p(3),p(4),p(5))), schema.weatherdata)
dataDF.registerTempTable("weatherdataSQL")
val datasql = sqlContext.sql("SELECT * FROM weatherdataSQL")
datasql.collect().foreach(println)
When running code, I get what is expected for the schema and sqlContext:
scala> object schema {
| val weatherdata= StructType(Seq(
| StructField("date", StringType, true),
| StructField("Region", StringType, true),
| StructField("Temperature", DecimalType(32,16), true),
| StructField("Solar", IntegerType, true),
| StructField("Rainfall", DecimalType(32,16), true),
| StructField("WindSpeed", DecimalType(32,16), true))
| )
| }
16/09/24 09:40:58 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:56288 in memory (size: 4.6 KB, free: 511.1 MB)
16/09/24 09:40:58 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:39349 in memory (size: 4.6 KB, free: 2.7 GB)
16/09/24 09:40:58 INFO ContextCleaner: Cleaned accumulator 2
16/09/24 09:40:58 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost in memory (size: 1964.0 B, free: 511.1 MB)
16/09/24 09:40:58 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:41412 in memory (size: 1964.0 B, free: 2.7 GB)
16/09/24 09:40:58 INFO ContextCleaner: Cleaned accumulator 1
defined module schema
scala> val dataDF=sqlContext.createDataFrame(rawDataNoHeader.map(p=>Row(p(0),p(1),p(2),p(3),p(4),p(5))), schema.weatherdata)
dataDF: org.apache.spark.sql.DataFrame = [date: string, Region: string, Temperature: decimal(32,16), Solar: int, Rainfall: decimal(32,16), WindSpeed: decimal(32,16)]
However, the last line of code gives me the following:
16/09/24 09:41:03 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): scala.MatchError: 20.21666667 (of class java.lang.String)
The number 20.21666667 is indeed the first temperature observed for a specific geographical region. I thought I had successfully specified for Temperature to be a Decimaltype(32,16). Is there a problem with my code or with even the sqlContext I am calling on?
As recommended I changed the dataDF to as follows:
val dataDF= sqlContext.createDataFrame(rawDataNoHeader.map(p=>Row(p(0),p(1),BigDecimal(p(2)),p(3),BigDecimal(p(4)),BigDecimal(p(5)))), schema.weatherdata)
Unfortunately, I now get a casting problem
16/09/24 10:31:35 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
Upvotes: 1
Views: 3875
Reputation: 330093
Since you know expected schema it is better to skip manual parsing and use proper input format. For Spark 1.6 / Scala 2.10 include spark-csv
package (--packages com.databricks:spark-csv_2.10:1.4.0
) and:
val sqlContext: SQLContext = ???
val path: String = ???
sqlContext.read
.format("csv")
.schema(schema.weatherdata).option("header", "true")
.load(path)
For 2.0+:
val spark: SparkSession = ???
val path: String = ???
spark.read
.format("csv")
.schema(schema.weatherdata).option("header", "true")
.load(path)
Upvotes: 0
Reputation: 1596
The code in your first edit was almost correct - p(3) had to be converted toInt
I created a sample csv file without headers:
2016,a,201.222,12,12.1,5.0
2016,b,200.222,13,12.3,6.0
2014,b,200.111,14,12.3,7.0
The results:
val dataDF= sqlContext.createDataFrame(rawData.map(p=>Row(p(0),p(1),BigDecimal(p(2)),p(3).toInt,BigDecimal(p(4)),BigDecimal(p(5)))), schema.weatherdata)
dataDF.show
+----+------+--------------------+-----+-------------------+------------------+
|date|Region| Temperature|Solar| Rainfall| WindSpeed|
+----+------+--------------------+-----+-------------------+------------------+
|2016| a|201.2220000000000000| 12|12.1000000000000000|5.0000000000000000|
|2016| b|200.2220000000000000| 13|12.3000000000000000|6.0000000000000000|
|2014| b|200.1110000000000000| 14|12.3000000000000000|7.0000000000000000|
+----+------+--------------------+-----+-------------------+------------------+
Upvotes: 1
Reputation: 578
It may be because you are reading this data from a .csv file.And by default it takes the data as "Text/String" format.You can solve this problem by 2 ways 1.Change the datatype in .csv file for the attribute temperature. 2.val temperatureInDecimal = BigDecimal("20.21666667")
I would recommened to use 2nd approach if you want to make your application from future perspective where .csv file can change.
Upvotes: 0