Reputation:
New to spark world and trying a dataset example written in scala that I found online
On running it through SBT , i keep on getting the following error
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class
Any idea what am i overlooking
Also feel free to point out better way of writing the same dataset example
Thanks
> sbt> runMain DatasetExample
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/25 01:06:39 INFO Remoting: Starting remoting
16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50555]
[error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:306)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:302)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:302)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:90)
at org.apache.spark.sql.DataFrame.as(DataFrame.scala:209)
at DatasetExample$.main(DatasetExample.scala:45)
at DatasetExample.main(DatasetExample.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
[trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
[error] (sparkExamples/compile:runMain) Nonzero exit code: 1
[error] Total time: 127 s, completed Oct 25, 2016 1:08:09 AM
Code :
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
object DatasetExample {
// Create data sets
case class Student(name: String, dept: String, age:Long )
case class Department(abbrevName: String, fullName: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Not sure what exactly is the purpose
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// initialise spark context
val conf = new SparkConf().setAppName("SetsExamples").setMaster("local")
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
import sqlcontext.implicits._ // Not sure what exactly is the purpose
// Read JSON objects into a Dataset[Student].
val students = sqlcontext.read.json("student.json").as[Student]
students.show()
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
students.select($"name".as[String], $"dept".as[String]).
filter(_._2 == "Math"). // Filter on _2, the second selected column
collect()
// Group by department and count each group.
students.groupBy(_.dept).count().collect()
// Group and aggregate in each group.
students.groupBy(_.dept).
agg(avg($"age").as[Double]).
collect()
// Initialize a Seq and convert to a Dataset.
val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()
// Show the contents of the Dataset.
depts.show()
// Join two datasets with "joinWith".
val joined = students.joinWith(depts, $"dept" === $"abbrevName")
// Show the contents of the joined Dataset.
// Note that the original objects are nested into tuples under the _1 and _2 columns.
joined.show()
// terminate spark context
sc.stop()
}
}
JSON file ( student.json) :
{"id" : "1201", "name" : "Kris", "age" : "25"}
{"id" : "1202", "name" : "John", "age" : "28"}
{"id" : "1203", "name" : "Chet", "age" : "39"}
{"id" : "1204", "name" : "Mark", "age" : "23"}
{"id" : "1205", "name" : "Vic", "age" : "23"}
Upvotes: 8
Views: 8030
Reputation: 40370
This line is what is causing the problem :
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
This means that you are adding a new outer scope to this context that can be used when instantiating an inner class
during deserialization.
Inner classes are created when a case class is defined in the Spark REPL and registering the outer scope that this class was defined in allows us to create new instances on the spark executors.
In normal use (your case), you shouldn't need to call this function.
EDIT: You'll also need to move your case classes outside of the DatasetExample
object.
Note:
import sqlContext.implicits._
is a scala-specific call for implicit methods available for converting common scala RDD objects into DataFrames.
More on that here.
Upvotes: 11