Leo Bogod
Leo Bogod

Reputation: 77

How to convert Json to array in spark

I am trying to convert Json list to array in spark but am getting a weird error

grouping expressions sequence is empty, and 'Census_block_group' is not an aggregate function. Wrap '(collect_list(json) ASjsonval)' in windowing function(s) or wrap 'Census_block_group' in first() (or first_value) if you don't care which value you get.;;

Can someone explain what this is ? I tried to use first() but then it gives error saying first is not defined. any help is greatly appreciated!

import org.apache.spark.sql.functions._

val unPivotDF2= unPivotDF.select(to_json(struct(col("attribute"),
     col("value"))).as("json"), col("Census_block_group"))

val unPivotDF3=unPivotDF2.select(collect_list("json").as("jsonval"),
 col("Census_block_group"))  

Sample Data :

 Census_block_group|attribute|value|
+------------------+---------+-----+
|      010010201001| B08007e1|  291|

Upvotes: 1

Views: 199

Answers (1)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

need to use groupBy , agg like this

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession

object DFToJsonSample extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)
  val spark = SparkSession.builder()
    .appName(this.getClass.getName)
    .config("spark.master", "local[*]").getOrCreate()

  //  val cl = ClassLoader.getSystemClassLoader
  //  cl.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(println)
  import spark.sql

  val sampleDF = sql("""select "010010201001" as Census_block_group  , "B08007e1"  as attribute, 291 as value""")
    .union(sql("""select "010010201002" as Census_block_group  , "B08007e2"  as attribute, 292 as value"""))
    .union(sql("""select "010010201001" as Census_block_group  , "B08007e1"  as attribute, 291 as value"""))
  sampleDF.show(false)

  import org.apache.spark.sql.functions._

  val sampleDF1 = sampleDF
    .select(to_json(struct(col("attribute"), col("value"))).as("json"), col("Census_block_group"))
  sampleDF1.show(false)
  val unPivotDF3 = sampleDF1.groupBy("Census_block_group")
    .agg(collect_list("json").as("jsonval"))
    .show(false)

}


Result :

+------------------+---------+-----+
|Census_block_group|attribute|value|
+------------------+---------+-----+
|010010201001      |B08007e1 |291  |
|010010201002      |B08007e2 |292  |
|010010201001      |B08007e1 |291  |
+------------------+---------+-----+

+------------------------------------+------------------+
|json                                |Census_block_group|
+------------------------------------+------------------+
|{"attribute":"B08007e1","value":291}|010010201001      |
|{"attribute":"B08007e2","value":292}|010010201002      |
|{"attribute":"B08007e1","value":291}|010010201001      |
+------------------------------------+------------------+

+------------------+----------------------------------------------------------------------------+
|Census_block_group|jsonval                                                                     |
+------------------+----------------------------------------------------------------------------+
|010010201001      |[{"attribute":"B08007e1","value":291}, {"attribute":"B08007e1","value":291}]|
|010010201002      |[{"attribute":"B08007e2","value":292}]                                      |
+------------------+----------------------------------------------------------------------------+

Upvotes: 1

Related Questions