Reputation: 370
DataSet:
+---+--------+
|age| name|
+---+--------+
| 33| Will|
| 26|Jean-Luc|
| 55| Hugh|
| 40| Deanna|
| 68| Quark|
| 59| Weyoun|
| 37| Gowron|
| 54| Will|
| 38| Jadzia|
| 27| Hugh|
+---+--------+
Here is my attempt but it just returns the size of the largest string rather than the largest string:
AgeName.groupBy("age")
.agg(max(length(AgeName("name")))).show()
Upvotes: 1
Views: 520
Reputation: 42402
The usual row_number trick should work if you specify the Window correctly. Using @LeoC's example,
val df = Seq(
(35, "John"),
(22, "Jennifer"),
(22, "Alexander"),
(35, "Michelle"),
(22, "Celia")
).toDF("age", "name")
val df2 = df.withColumn(
"rownum",
expr("row_number() over (partition by age order by length(name) desc)")
).filter("rownum = 1").drop("rownum")
df2.show
+---+---------+
|age| name|
+---+---------+
| 22|Alexander|
| 35| Michelle|
+---+---------+
Upvotes: 2
Reputation: 59
object BasicDatasetTest {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.master("local[*]")
.appName("BasicDatasetTest")
.getOrCreate()
val pairs=List((33,"Will"),(26,"Jean-Luc"),
(55, "Hugh"),
(26, "Deanna"),
(26, "Quark"),
(55, "Weyoun"),
(33, "Gowron"),
(55, "Will"),
(26, "Jadzia"),
(27, "Hugh"))
val schema=new StructType(Array(
StructField("age",IntegerType,false),
StructField("name",StringType,false))
)
val dataRDD=spark.sparkContext.parallelize(pairs).map(record=>Row(record._1,record._2))
val dataset=spark.createDataFrame(dataRDD,schema)
val ageNameGroup=dataset.groupBy("age","name")
.agg(max(length(col("name"))))
.withColumnRenamed("max(length(name))","length")
ageNameGroup.printSchema()
val ageGroup=dataset.groupBy("age")
.agg(max(length(col("name"))))
.withColumnRenamed("max(length(name))","length")
ageGroup.printSchema()
ageGroup.createOrReplaceTempView("age_group")
ageNameGroup.createOrReplaceTempView("age_name_group")
spark.sql("select ag.age,ang.name from age_group as ag, age_name_group as ang " +
"where ag.age=ang.age and ag.length=ang.length")
.show()
}
}
Upvotes: 0
Reputation: 22449
Here's one approach using Spark higher-order function, aggregate
, as shown below:
val df = Seq(
(35, "John"),
(22, "Jennifer"),
(22, "Alexander"),
(35, "Michelle"),
(22, "Celia")
).toDF("age", "name")
df.
groupBy("age").agg(collect_list("name").as("names")).
withColumn(
"longest_name",
expr("aggregate(names, '', (acc, x) -> case when length(acc) < length(x) then x else acc end)")
).
show(false)
// +---+----------------------------+------------+
// |age|names |longest_name|
// +---+----------------------------+------------+
// |22 |[Jennifer, Alexander, Celia]|Alexander |
// |35 |[John, Michelle] |Michelle |
// +---+----------------------------+------------+
Note that higher-order functions are available only on Spark 2.4+.
Upvotes: 2