Mayuri
Mayuri

Reputation: 68

How to Process array of json column in spark sql dataframe

Input Json

{"studentName": "abc","mailId": "[email protected]","class" : 7,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "[email protected]","class" : 8,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "[email protected]","class" : 9,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId       |newSub     |score|scoreBoard                                                                                      |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7    |A    |[email protected]|Environment|95   |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]]   |abc        |
|8    |A    |[email protected]|Environment|95   |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]]   |xyz        |
|9    |A    |[email protected]|Environment|95   |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg        |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+

Processing I want -

  1. add newSub's json is scoreBoard list (read data from user row - newSub, score, grade)

  2. sort them on score and remove the json from scoreBoard list having less score

Expected output -

{"studentName": "abc","mailId": "[email protected]","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "[email protected]","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "[email protected]","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId       |scoreBoard                                                                                         |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7    |[email protected]|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc        |
|8    |[email protected]|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]]  |xyz        |
|9    |[email protected]|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg        |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+

I tried

1st way - UDF processing but Sorting and deleting json from scoreBoard column in UDF is challenging

2nd way - explode the column scoreBoard, got 6 row for single student, each for every subject. Challenge I am facing in this is, how to process data group wise, Like how to add new row for new Subject,sort each user's subject score and delete one row.

Need help to select way to solve this problem, if anyone know is there any new/different efficient way to do the same processing. Thanks!!

Upvotes: 0

Views: 162

Answers (2)

Chema
Chema

Reputation: 2838

This approach is using Spark dataframes/datasets and Spark SQL.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}

object ProcessingList {
  val spark = SparkSession
    .builder()
    .appName("ProcessingList")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ProcessingList") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val sqlContext = spark.sqlContext

  val input = "/home/cloudera/files/tests/list_processing.json"

  case class Student(cl: Long, grade: String,mail : String,ns: String,score: Long,sbGrade: String, sbScore: Long,sbSubject: String, name: String)

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {
      import spark.implicits._

      val studentTest = sqlContext
        .read
        .json(input)
        .flatMap(r => r.getSeq(5).map( (sq: Row)  => Student(r.getLong(0), r.getString(1), r.getString(2), r.getString(3), r.getLong(4),sq.getString(0),sq.getLong(1), sq.getString(2), r.getString(6)))).as[Student]
        .cache()

      studentTest.show(truncate = false)

      studentTest.createOrReplaceTempView("student_test")

      sqlContext
          .sql(
            """
              |SELECT cl, grade, mail,ns, score, 
              |RANK() OVER(PARTITION BY cl ORDER BY sbScore DESC) AS ranking, 
              |sbGrade,sbScore, sbSubject, name
              |FROM student_test
              |ORDER BY cl
              |""".stripMargin)
          .show(truncate = false)


      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

and expected results

+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|cl |grade|mail         |ns         |score|points|sbGrade|sbScore|sbSubject|name|
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|7  |A    |[email protected]|Environment|95   |1     |A      |90     |Math     |abc |
|7  |A    |[email protected]|Environment|95   |2     |A      |82     |Science  |abc |
|7  |A    |[email protected]|Environment|95   |3     |A      |80     |History  |abc |
|7  |A    |[email protected]|Environment|95   |3     |A      |80     |English  |abc |
|7  |A    |[email protected]|Environment|95   |3     |A      |80     |Geography|abc |
|7  |A    |[email protected]|Environment|95   |6     |B      |75     |Hindi    |abc |
|8  |A    |[email protected]|Environment|95   |1     |A      |90     |Math     |xyz |
|8  |A    |[email protected]|Environment|95   |2     |A      |87     |Biology  |xyz |
|8  |A    |[email protected]|Environment|95   |3     |A      |85     |Physics  |xyz |
|8  |A    |[email protected]|Environment|95   |4     |A      |80     |Chemistry|xyz |
|8  |A    |[email protected]|Environment|95   |5     |B      |75     |Hindi    |xyz |
|8  |A    |[email protected]|Environment|95   |6     |B      |70     |English  |xyz |
|9  |A    |[email protected]|Environment|95   |1     |A      |95     |Computer |efg |
|9  |A    |[email protected]|Environment|95   |2     |A      |91     |Math     |efg |
|9  |A    |[email protected]|Environment|95   |3     |A      |82     |English  |efg |
|9  |A    |[email protected]|Environment|95   |4     |B      |77     |Physics  |efg |
|9  |A    |[email protected]|Environment|95   |5     |B      |76     |Biology  |efg |
|9  |A    |[email protected]|Environment|95   |6     |B      |72     |Chemistry|efg |
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+

Upvotes: 1

undefined_variable
undefined_variable

Reputation: 6228

 import ss.implicits._

  val schema = new ArrayType(new StructType(Array(
    StructField("grade",DataTypes.StringType,true),
    StructField("score",DataTypes.LongType,true),
    StructField("subject",DataTypes.StringType,true))),true)

  def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)

  def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)

val df2 =  df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
    .withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
  df2.select("scoreBoard").show(false)

UDF approach, where ss is SparkSession. addvalue can be replaced with array_union if using spark version 2.4 and above.

Above code will work for spark 2.0 and above

Upvotes: 1

Related Questions