Autumn
Autumn

Reputation: 129

how to query the special columns in spark streaming

i got a json from kafka in spark streaming,:

{"name":"name","value1":"value1"}
{"name":"name","value1":"value1","value2":"value2"}

read and get its schema:

     val df = spark.read.json(rdd.map(_._2))
     df.printSchema() //shows
      //--name
      //--values1
      // --values2
    df.createOrReplaceTempView("df")
    spark.sql("select name,values2 from df")

but it'll output:

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:282)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:292)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:296)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

BTW, its OK in spark but fails in streaming. anybody know that?

Upvotes: 0

Views: 819

Answers (1)

Gourav Dutta
Gourav Dutta

Reputation: 553

It seems, that your code is not correct syntactically so, the Catalyst optimizer, while analyzing your query, is not able to resolve all references with Catalog.

.json() method either takes a path or RDD[String]; not an expression.

Slight change in your code, I guess will resolve this error. Please find below.

1. Using Spark Sql AST

It seems you're using AST (as you'registering it as a TempView). Change your code spark.read.json("select name,values2 from df") to the following.

    spark.sql("select name,values2 from df")

2. Using Spark Sql DSL

Without creating TempView, you can also achieve the same (I mostly prefer this as the code in DSL is more crisp in nature).

 df.select("name","values2")

Just skip the code df.createOrReplaceTempView("df") in this case and simply call the aforementioned.

Hope, this helps.

Upvotes: 1

Related Questions