Reputation: 891
I have a dataframe with two columns with data as below
+----+-----------------+
|acct| device|
+----+-----------------+
| B| List(3, 4)|
| C| List(3, 5)|
| A| List(2, 6)|
| B|List(3, 11, 4, 9)|
| C| List(5, 6)|
| A|List(2, 10, 7, 6)|
+----+-----------------+
And I need the result as below
+----+-----------------+
|acct| device|
+----+-----------------+
| B|List(3, 4, 11, 9)|
| C| List(3, 5, 6)|
| A|List(2, 6, 7, 10)|
+----+-----------------+
I tried as below but ,it seems to be not working
df.groupBy("acct").agg(concat("device"))
df.groupBy("acct").agg(collect_set("device"))
Please let me know how can I achieve this using Scala?
Upvotes: 7
Views: 14451
Reputation: 37842
Another option that might perform better than the explode
option: creating your own UserDefinedAggregationFunction that merges lists into distinct sets.
You'll have to extend UserDefinedAggregateFunction
as follows:
class MergeListsUDAF extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(Seq(StructField("a", ArrayType(IntegerType))))
override def bufferSchema: StructType = inputSchema
override def dataType: DataType = ArrayType(IntegerType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, mutable.Seq[Int]())
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val existing = buffer.getAs[mutable.Seq[Int]](0)
val newList = input.getAs[mutable.Seq[Int]](0)
val result = (existing ++ newList).distinct
buffer.update(0, result)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
override def evaluate(buffer: Row): Any = buffer.getAs[mutable.Seq[Int]](0)
}
And use it like so:
val mergeUDAF = new MergeListsUDAF()
df.groupBy("acct").agg(mergeUDAF($"device"))
Upvotes: 3
Reputation: 1109
You can try to use collect_set
and Window
. In your case:
df.withColumn("device", collect_set("device").over(Window.partitionBy("acct")))
Upvotes: 0
Reputation: 37842
You can start by exploding the device
column and continue as you did - but note that it might not preserve the order of the lists (which anyway isn't guaranteed in any group by):
val result = df.withColumn("device", explode($"device"))
.groupBy("acct")
.agg(collect_set("device"))
result.show(truncate = false)
// +----+-------------------+
// |acct|collect_set(device)|
// +----+-------------------+
// |B |[9, 3, 4, 11] |
// |C |[5, 6, 3] |
// |A |[2, 6, 10, 7] |
// +----+-------------------+
Upvotes: 6