Reputation: 129
i got a json from kafka in spark streaming,:
{"name":"name","value1":"value1"}
{"name":"name","value1":"value1","value2":"value2"}
read and get its schema:
val df = spark.read.json(rdd.map(_._2))
df.printSchema() //shows
//--name
//--values1
// --values2
df.createOrReplaceTempView("df")
spark.sql("select name,values2 from df")
but it'll output:
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:282) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:292) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:296) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
BTW, its OK in spark but fails in streaming. anybody know that?
Upvotes: 0
Views: 819
Reputation: 553
It seems, that your code is not correct syntactically so, the Catalyst optimizer, while analyzing your query, is not able to resolve all references with Catalog.
.json()
method either takes a path or RDD[String]; not an expression.
Slight change in your code, I guess will resolve this error. Please find below.
1. Using Spark Sql AST
It seems you're using AST (as you'registering it as a TempView). Change your code spark.read.json("select name,values2 from df") to the following.
spark.sql("select name,values2 from df")
2. Using Spark Sql DSL
Without creating TempView, you can also achieve the same (I mostly prefer this as the code in DSL is more crisp in nature).
df.select("name","values2")
Just skip the code df.createOrReplaceTempView("df")
in this case and simply call the aforementioned.
Hope, this helps.
Upvotes: 1