Reputation: 68
Input Json
{"studentName": "abc","mailId": "[email protected]","class" : 7,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "[email protected]","class" : 8,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "[email protected]","class" : 9,"newSub" : "Environment","grade" : "A","score" : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId |newSub |score|scoreBoard |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7 |A |[email protected]|Environment|95 |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]] |abc |
|8 |A |[email protected]|Environment|95 |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]] |xyz |
|9 |A |[email protected]|Environment|95 |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
Processing I want -
add newSub's json is scoreBoard list (read data from user row - newSub, score, grade)
sort them on score and remove the json from scoreBoard list having less score
Expected output -
{"studentName": "abc","mailId": "[email protected]","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "[email protected]","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "[email protected]","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId |scoreBoard |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7 |[email protected]|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc |
|8 |[email protected]|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]] |xyz |
|9 |[email protected]|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
I tried
1st way - UDF processing but Sorting and deleting json from scoreBoard column in UDF is challenging
2nd way - explode the column scoreBoard, got 6 row for single student, each for every subject. Challenge I am facing in this is, how to process data group wise, Like how to add new row for new Subject,sort each user's subject score and delete one row.
Need help to select way to solve this problem, if anyone know is there any new/different efficient way to do the same processing. Thanks!!
Upvotes: 0
Views: 162
Reputation: 2838
This approach is using Spark dataframes/datasets
and Spark SQL
.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
object ProcessingList {
val spark = SparkSession
.builder()
.appName("ProcessingList")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","ProcessingList") // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
val input = "/home/cloudera/files/tests/list_processing.json"
case class Student(cl: Long, grade: String,mail : String,ns: String,score: Long,sbGrade: String, sbScore: Long,sbSubject: String, name: String)
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
try {
import spark.implicits._
val studentTest = sqlContext
.read
.json(input)
.flatMap(r => r.getSeq(5).map( (sq: Row) => Student(r.getLong(0), r.getString(1), r.getString(2), r.getString(3), r.getLong(4),sq.getString(0),sq.getLong(1), sq.getString(2), r.getString(6)))).as[Student]
.cache()
studentTest.show(truncate = false)
studentTest.createOrReplaceTempView("student_test")
sqlContext
.sql(
"""
|SELECT cl, grade, mail,ns, score,
|RANK() OVER(PARTITION BY cl ORDER BY sbScore DESC) AS ranking,
|sbGrade,sbScore, sbSubject, name
|FROM student_test
|ORDER BY cl
|""".stripMargin)
.show(truncate = false)
// To have the opportunity to view the web console of Spark: http://localhost:4041/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
and expected results
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|cl |grade|mail |ns |score|points|sbGrade|sbScore|sbSubject|name|
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|7 |A |[email protected]|Environment|95 |1 |A |90 |Math |abc |
|7 |A |[email protected]|Environment|95 |2 |A |82 |Science |abc |
|7 |A |[email protected]|Environment|95 |3 |A |80 |History |abc |
|7 |A |[email protected]|Environment|95 |3 |A |80 |English |abc |
|7 |A |[email protected]|Environment|95 |3 |A |80 |Geography|abc |
|7 |A |[email protected]|Environment|95 |6 |B |75 |Hindi |abc |
|8 |A |[email protected]|Environment|95 |1 |A |90 |Math |xyz |
|8 |A |[email protected]|Environment|95 |2 |A |87 |Biology |xyz |
|8 |A |[email protected]|Environment|95 |3 |A |85 |Physics |xyz |
|8 |A |[email protected]|Environment|95 |4 |A |80 |Chemistry|xyz |
|8 |A |[email protected]|Environment|95 |5 |B |75 |Hindi |xyz |
|8 |A |[email protected]|Environment|95 |6 |B |70 |English |xyz |
|9 |A |[email protected]|Environment|95 |1 |A |95 |Computer |efg |
|9 |A |[email protected]|Environment|95 |2 |A |91 |Math |efg |
|9 |A |[email protected]|Environment|95 |3 |A |82 |English |efg |
|9 |A |[email protected]|Environment|95 |4 |B |77 |Physics |efg |
|9 |A |[email protected]|Environment|95 |5 |B |76 |Biology |efg |
|9 |A |[email protected]|Environment|95 |6 |B |72 |Chemistry|efg |
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
Upvotes: 1
Reputation: 6228
import ss.implicits._
val schema = new ArrayType(new StructType(Array(
StructField("grade",DataTypes.StringType,true),
StructField("score",DataTypes.LongType,true),
StructField("subject",DataTypes.StringType,true))),true)
def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)
def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)
val df2 = df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
.withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
df2.select("scoreBoard").show(false)
UDF approach, where ss is SparkSession. addvalue can be replaced with array_union if using spark version 2.4 and above.
Above code will work for spark 2.0 and above
Upvotes: 1