Madhav Thaker
Madhav Thaker

Reputation: 370

Scala - Return the largest string within each group

DataSet:

+---+--------+
|age|    name|
+---+--------+
| 33|    Will|
| 26|Jean-Luc|
| 55|    Hugh|
| 40|  Deanna|
| 68|   Quark|
| 59|  Weyoun|
| 37|  Gowron|
| 54|    Will|
| 38|  Jadzia|
| 27|    Hugh|
+---+--------+

Here is my attempt but it just returns the size of the largest string rather than the largest string:

AgeName.groupBy("age")
      .agg(max(length(AgeName("name")))).show()

Upvotes: 1

Views: 520

Answers (3)

mck
mck

Reputation: 42402

The usual row_number trick should work if you specify the Window correctly. Using @LeoC's example,

val df = Seq(
  (35, "John"),
  (22, "Jennifer"),
  (22, "Alexander"),
  (35, "Michelle"),
  (22, "Celia")
).toDF("age", "name")

val df2 = df.withColumn(
    "rownum", 
    expr("row_number() over (partition by age order by length(name) desc)")
).filter("rownum = 1").drop("rownum")

df2.show
+---+---------+
|age|     name|
+---+---------+
| 22|Alexander|
| 35| Michelle|
+---+---------+

Upvotes: 2

Mohammad Mahfooz Alam
Mohammad Mahfooz Alam

Reputation: 59

object BasicDatasetTest {

  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder()
    .master("local[*]")
    .appName("BasicDatasetTest")
    .getOrCreate()

    val pairs=List((33,"Will"),(26,"Jean-Luc"),
    (55,    "Hugh"),
    (26, "Deanna"),
    (26,   "Quark"),
    (55,  "Weyoun"),
    (33,  "Gowron"),
    (55,    "Will"),
    (26,  "Jadzia"),
    (27,   "Hugh"))

    val schema=new StructType(Array(
      StructField("age",IntegerType,false),
      StructField("name",StringType,false))
    )

    val dataRDD=spark.sparkContext.parallelize(pairs).map(record=>Row(record._1,record._2))

    val dataset=spark.createDataFrame(dataRDD,schema)

    val ageNameGroup=dataset.groupBy("age","name")
    .agg(max(length(col("name"))))
    .withColumnRenamed("max(length(name))","length")

    ageNameGroup.printSchema()

    val ageGroup=dataset.groupBy("age")
    .agg(max(length(col("name"))))
    .withColumnRenamed("max(length(name))","length")

    ageGroup.printSchema()

    ageGroup.createOrReplaceTempView("age_group")
    ageNameGroup.createOrReplaceTempView("age_name_group")

    spark.sql("select ag.age,ang.name from age_group as ag, age_name_group as ang " +
      "where ag.age=ang.age and ag.length=ang.length")
    .show()
  }
}

Upvotes: 0

Leo C
Leo C

Reputation: 22449

Here's one approach using Spark higher-order function, aggregate, as shown below:

val df = Seq(
  (35, "John"),
  (22, "Jennifer"),
  (22, "Alexander"),
  (35, "Michelle"),
  (22, "Celia")
).toDF("age", "name")

df.
  groupBy("age").agg(collect_list("name").as("names")).
  withColumn(
    "longest_name",
    expr("aggregate(names, '', (acc, x) -> case when length(acc) < length(x) then x else acc end)")
  ).
  show(false)
// +---+----------------------------+------------+
// |age|names                       |longest_name|
// +---+----------------------------+------------+
// |22 |[Jennifer, Alexander, Celia]|Alexander   |
// |35 |[John, Michelle]            |Michelle    |
// +---+----------------------------+------------+

Note that higher-order functions are available only on Spark 2.4+.

Upvotes: 2

Related Questions