kiran kumar
kiran kumar

Reputation: 91

Error while adding Schema to Spark DataFrame loaded from file

val tableDF = spark.read.option("delimiter",",").csv("/Volumes/Data/ap/click/test.csv")
import org.apache.spark.sql.types.{StringType, StructField, StructType, IntegerType}

val schemaTd = StructType(List(StructField("time_id",IntegerType),StructField("week",IntegerType),StructField("month",IntegerType),StructField("calendar",StringType)))

val result = spark.createDataFrame(tableDF,schemaTd)

test.csv data sample below

6659,951,219,2018-03-25 00:00:00
6641,949,219,2018-03-07 00:00:00
6645,949,219,2018-03-11 00:00:00
6638,948,219,2018-03-04 00:00:00
6646,950,219,2018-03-12 00:00:00
6636,948,219,2018-03-02 00:00:00
6643,949,219,2018-03-09 00:00:00

all the columns except last value are Int type in the file still getting an error

scala> result.show
2018-05-20 17:08:54 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, time_id), IntegerType) AS time_id#23
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, week), IntegerType) AS week#24
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, month), IntegerType) AS month#25
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, calendar), StringType), true, false) AS calendar#26
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)

Upvotes: 0

Views: 2436

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

In this case you should provide schema to DataFrameReader:

import org.apache.spark.sql.types._

val schemaTd = StructType(List(
   StructField("time_id",IntegerType),
   StructField("week",IntegerType),
   StructField("month",IntegerType),
   StructField("calendar",StringType)))

val tableDF = spark.read.option("delimiter",",")
  .schema(schemaTd)
  .csv("/Volumes/Data/ap/click/test.csv")

When Dataset is created from RDD[Row] (I assume your actual code is spark.createDataFrame(tableDF.rdd, schemaTd), otherwise it shouldn't really compile), types have to be consistent with schema. You cannot provide String (default type for CSV reader) and declare schema with IntegerType.

Upvotes: 1

Related Questions