Reputation: 37
I am working with Spark SQL, and doing some SQL operations on a Hive Table. My table is like this: ```
ID COST CODE
1 100 AB1
5 200 BC3
1 400 FD3
6 600 HJ2
1 900 432
3 800 DS2
2 500 JT4
```
I want to create another table out of this, which would have the total cost and top 5 CODES in a chain in another column like this.
```
ID TOTAL_COST CODE CODE_CHAIN
1 1400 432 432, FD3, AB1
```
Total Cost is easy but, how to concat the values from the CODE column and form another column.
I have tried collect_set function but, the values cannot be limited and also are not properly sorted, probably due to distributed processing.
Any SQL logic is possible?
EDIT:
I need the data sorted, so I get top 5 values.
Upvotes: 0
Views: 357
Reputation: 8711
Use window function and with() table to filter on the first row_number. Check this out:
scala> val df = Seq((1,100,"AB1"),(5,200,"BC3"),(1,400,"FD3"),(6,600,"HJ2"),(1,900,"432"),(3,800,"DS2"),(2,500,"JT4")).toDF("ID","COST","CODE")
df: org.apache.spark.sql.DataFrame = [ID: int, COST: int ... 1 more field]
scala> df.show()
+---+----+----+
| ID|COST|CODE|
+---+----+----+
| 1| 100| AB1|
| 5| 200| BC3|
| 1| 400| FD3|
| 6| 600| HJ2|
| 1| 900| 432|
| 3| 800| DS2|
| 2| 500| JT4|
+---+----+----+
scala> df.createOrReplaceTempView("course")
scala> spark.sql(""" with tab1(select id,cost,code,collect_list(code) over(partition by id order by cost desc rows between current row and 5 following ) cc, row_number() over(partition by id order by cost desc) rc,sum(cost) over(partition by id order by cost desc rows between current row and 5 following) total from course) select id, total, cc from tab1 where rc=1 """).show(false)
+---+-----+---------------+
|id |total|cc |
+---+-----+---------------+
|1 |1400 |[432, FD3, AB1]|
|6 |600 |[HJ2] |
|3 |800 |[DS2] |
|5 |200 |[BC3] |
|2 |500 |[JT4] |
+---+-----+---------------+
scala>
Upvotes: 1
Reputation: 26
Use slice
, sort_array
, and collect_list
import org.apache.spark.sql.functions._
df
.groupBy("id")
.agg(
sum("cost") as "total_cost",
slice(sort_array(collect_list(struct($"cost", $"code")), false), 1, 5)("code") as "codes")
In Spark 2.3 you'll have to replace slice
with manual indexing of the sorted array
val sorted = sort_array(collect_list(struct($"cost", $"code")), false)("code")
val codes = array((0 until 5).map(i => sorted.getItem(i)): _*) as "codes"
Upvotes: 1