Reputation: 96
I have a json file whose structure is [{"time","currentStop","lat","lon","speed"}], here is an example:
[
{"time":"2015-06-09 23:59:59","currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"},
{"time":"2015-06-09 21:00:49","currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},
{"time":"2015-06-09 21:55:49","currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}
]
And I want to get json result which has structure [{"hour","value":["currentStop","lat","lon","speed"]}]. The result shows hourly data of distinct ("currentStop","lat","lon","speed"). Here is the result of the example(skip some empty values):
[
{"hour":0,"value":[]},
{"hour":1,"value":[]},
......
{"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}
{"hour":23, "value": [{"currentStop":"xx","lat":22.264856,"lon":113.520450,"speed":25.30}]},
]
Is it possible to achieve this using spark-sql query?
I use spark with Java API, and with loops, I can get what I want, but this way is really inefficient and costs much.
Here is my code:
Dataset<Row> bus_ic=spark.read().json(file);
bus_ic.createOrReplaceTempView("view");
StringBuilder text = new StringBuilder("[");
bus_ic.select(bus_ic.col("currentStop"),
bus_ic.col("lon").cast("double"), bus_ic.col("speed").cast("double"),
bus_ic.col("lat").cast("double"),bus_ic.col("LINEID"),
bus_ic.col("time").cast("timestamp"))
.createOrReplaceTempView("view");
StringBuilder sqlString = new StringBuilder();
for(int i = 0; i<24; i++){
sqlString.delete(0,sqlString.length());
sqlString.append("select currentStop, speed, lat, lon from view where hour(time) = ")
.append(i)
.append(" group by currentStop, speed, lat, lon");
Dataset<Row> t = spark.sql(sqlString.toString());
text.append("{")
.append("\"h\":").append(i)
.append(",\"value\":")
.append(t.toJSON().collectAsList().toString())
.append("}");
if(i!=23) text.append(",");
}
text.append("]");
There must be some other ways to solve this problem. How to write efficient sql query to achieve this goal?
Upvotes: 0
Views: 56
Reputation: 35219
You can write your code in much more concise way (Scala code):
val bus_comb = bus_ic
.groupBy(hour(to_timestamp(col("time"))).as("hour"))
.agg(collect_set(struct(
col("currentStop"), col("lat"), col("lon"), col("speed")
)).alias("value"));
bus_comb.toJSON.show(false);
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |value |
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |{"hour":23,"value":[{"currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"}]} |
// |{"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}|
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
but with only 24 grouping records, there is no opportunity for scaling out here. It might be an interesting exercise, but it is not something you can really apply on large dataset, where using Spark makes sense.
You can add missing hours by joining with range
:
spark.range(0, 24).toDF("hour").join(bus_comb, Seq("hour"), "leftouter")
Upvotes: 1