Reputation: 91
val tableDF = spark.read.option("delimiter",",").csv("/Volumes/Data/ap/click/test.csv")
import org.apache.spark.sql.types.{StringType, StructField, StructType, IntegerType}
val schemaTd = StructType(List(StructField("time_id",IntegerType),StructField("week",IntegerType),StructField("month",IntegerType),StructField("calendar",StringType)))
val result = spark.createDataFrame(tableDF,schemaTd)
test.csv data sample below
6659,951,219,2018-03-25 00:00:00
6641,949,219,2018-03-07 00:00:00
6645,949,219,2018-03-11 00:00:00
6638,948,219,2018-03-04 00:00:00
6646,950,219,2018-03-12 00:00:00
6636,948,219,2018-03-02 00:00:00
6643,949,219,2018-03-09 00:00:00
all the columns except last value are Int type in the file still getting an error
scala> result.show
2018-05-20 17:08:54 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, time_id), IntegerType) AS time_id#23
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, week), IntegerType) AS week#24
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, month), IntegerType) AS month#25
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, calendar), StringType), true, false) AS calendar#26
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
Upvotes: 0
Views: 2436
Reputation: 35249
In this case you should provide schema to DataFrameReader
:
import org.apache.spark.sql.types._
val schemaTd = StructType(List(
StructField("time_id",IntegerType),
StructField("week",IntegerType),
StructField("month",IntegerType),
StructField("calendar",StringType)))
val tableDF = spark.read.option("delimiter",",")
.schema(schemaTd)
.csv("/Volumes/Data/ap/click/test.csv")
When Dataset
is created from RDD[Row]
(I assume your actual code is spark.createDataFrame(tableDF.rdd, schemaTd)
, otherwise it shouldn't really compile), types have to be consistent with schema. You cannot provide String
(default type for CSV reader) and declare schema with IntegerType
.
Upvotes: 1