Reputation: 21
I am trying to use spark to create a limited sorted list for data frame however I am not able to think of a fast and a low memory approach.
My data frame consists of three columns two keys ids and a distance column and I want to get list of top n=50 ids close to each of the ids. I tried groupBy followed by collect_list followed by sort_array, followed by UDF to get only IDs and finally passing it via a UDF to take the first n=50 but it is very slow and sometimes getting a memory error.
# Sample Data
val dataFrameTest = Seq(
("key1", "key2", 1),
("key1","key3", 2),
("key1", "key5" ,4),
("key1", "key6" ,5),
("key1","key8" ,6),
("key2", "key7" ,3),
("key2", "key9" ,4),
("key2","key5" ,5)
).toDF("id1", "id2", "distance")
If limit is 2 want
"key1" | ["key2", "key3"]
"key2" | ["key7", "key8"]
current_approach :
sorted_df = dataFrameTest.groupBy("key1").agg(collect_list(struct("distance", "id2")).alias("toBeSortedCol")).
withColumn("sortedList", sort_array("toBeSortedCol"))
My data is quite large so that's why spark is the only solution. I appreciate any help/guidance.
Upvotes: 1
Views: 351
Reputation: 9425
What about using one of Spark SQL windowing functions for this? Something like
scala> val dataFrameTest = Seq(
| ("key1", "key2", 1),
| ("key1","key3", 2),
| ("key1", "key5" ,4),
| ("key1", "key6" ,5),
| ("key1","key8" ,6),
| ("key2", "key7" ,3),
| ("key2", "key9" ,4),
| ("key2","key5" ,5)
| ).toDF("id1", "id2", "distance")
dataFrameTest: org.apache.spark.sql.DataFrame = [id1: string, id2: string ... 1 more field]
scala> dataFrameTest.createOrReplaceTempView("sampledata")
scala> spark.sql("""
| select t.id1, collect_list(t.id2) from (
| select id1, id2, row_number() over (partition by id1 order by distance) as rownum from sampledata
| )t
| where t.rownum < 3 group by t.id1
| """).show(false)
+----+-----------------+
|id1 |collect_list(id2)|
+----+-----------------+
|key1|[key2, key3] |
|key2|[key7, key9] |
+----+-----------------+
scala>
Just substitute row_number()
with rank()
or dense_rank()
depending on type of result you need.
Upvotes: 1