Reputation: 8281
Given this Dataframe
:
+---+---+
| c1| c2|
+---+---+
| A| 1|
| A| 2|
| A| 1|
| B| 3|
| B| 4|
| B| 4|
+---+---+
I want to compute for each value c1, the more frequent value of c2
+---+---+
| c1| c2|
+---+---+
| A| 1|
| B| 4|
+---+---+
Here is my current code (Spark 1.6.0)
val df = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 4))).toDF("c1", "c2")
df.groupBy("c1", "c2")
.count()
.groupBy("c1")
.agg(max(struct(col("count"), col("c2"))).as("max"))
.select("c1", "max.c2")
Is there a better way ?
Upvotes: 0
Views: 128
Reputation: 17
val df = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 4))).toDF("c1", "c2")
import org.apache.spark.sql.expressions.Window
val overCategory = Window.partitionBy($"c1",$"c2").orderBy($"c2".desc)
val countd = df.withColumn("count", count($"c2").over(overCategory)).dropDuplicates
val freqCategory =countd.withColumn("max",max($"count").over(Window.partitionBy($"c1"))).filter($"count"===$"max").drop("count","max")
Upvotes: 0
Reputation: 11587
If you are comfortable using Spark SQL the below implementation would work. Please note that the window functions in Spark SQL is available from Spark 1.4 onwards.
df.registerTempTable("temp_table")
sqlContext.sql
("""
SELECT c1,c2 FROM
(SELECT c1,c2, RANK() OVER(PARTITION BY c1 ORDER BY cnt DESC) as rank FROM (
SELECT c1,c2,count(*) as cnt FROM temp_table GROUP BY c1,c2) t0) t1
WHERE t1.rank = 1
""").show()
Upvotes: 1