Reputation: 417
I have a set of big compressed json files with nested key-value pairs. There are about 70-80 keys (and subkeys) in the json object, however, I am only interested in few keys. I wanted to query the json files with Spark SQL, only pick out the key-value pairs that I am interested in, and output them to a set of csv files. It takes about 5 mins to process a compressed json file of 170MB in size. I am just wondering whether there could be any way to optimize this process. Or is there any better tools other than Spark for this kind of job? Thanks!
Here is a snapshot of the scala code I was using:
val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")
// query the json file, grab columns of interest
val query =
"""
|SELECT col1, col2, col3, col4, col5
|FROM pixels
|WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)
// reformat the timestamps
val result2 = result.map(
row => {
val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
row(8), row(9), row(10), row(11))
}
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)
Upvotes: 3
Views: 8211
Reputation: 837
It's easy way to process json:
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
peopleDF.printSchema()
peopleDF.createOrReplaceTempView("people")
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show()
val otherPeopleRDD = spark.sparkContext.makeRDD( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show()
see doc: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
Upvotes: -1
Reputation: 13985
Lets say your json data looks like following,
{ "c1": "timestamp_1", "c2": "12", "c3": "13", "c": "14", "c5": "15", ... }
{ "c1": "timestamp_1", "c2": "22", "c3": "23", "c": "24", "c5": "25", ... }
{ "c1": "timestamp_1", "c2": "32", "c3": "33", "c": "34", "c5": "35", ... }
Now, you can use a json lib and RDD's to do the transformation dump.
import play.api.libs.json._
val data = sc.textFile("abcdefg.txt.gz")
val jsonData = data.map(line => Json.parse(line))
// filter the rdd and just keep the values of interest
val filteredData = data
.filter(json => {
val c1 = (json \ "c1").as[String]
List[String]("c1_val1", "c2_val2", ...).contains(c1)
})
// reformat the timestamps and transform to tuple
val result2 = filteredData
.map(json => {
val ts = (json \ "c1").as[String]
val tsFormated = ts.stripSuffix("Z").replace("T"," ")
(tsFormated, (json \ "c2").as[String], ...)
})
val output_file = "/root/target"
result2.saveAsTextFile(output_file)
Upvotes: 2