Reputation: 5745
I have a DataFrame
that has the following schema.
root
|-- memberId: long (nullable = true)
|-- items: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timestamp: long (nullable = true)
| | |-- itemId: integer (nullable = true)
| | |-- weight: double (nullable = true)
Say, the DataFrame (called df
) looks like this.
+-----------+------------------------------------------------------------------------+
|memberId |items |
+-----------+------------------------------------------------------------------------+
|10000000001|[[1234567891, 104, 1.0], [1234567892, 103, 3.0]] |
|10000000002|[[1234567891, 103, 1.0], [1234567893, 102, 1.0], [1234567894, 101, 2.0]]|
+-----------+------------------------------------------------------------------------+
As can be seen, the df
is a map of memberId
to a list
of struct
s. I want to transform it such that I retrieve the last element in the list of struct
s corresponding to each member. So, the resulting DataFrame
should look like
+-----------+----------------------+
|memberId |lastItem |
+-----------+----------------------+
|10000000001|[1234567892, 103, 3.0]|
|10000000002|[1234567894, 101, 2.0]|
+-----------+----------------------+
I tried this
val newDf = df
.withColumn("lastItem", last($"items"))
.drop("items")
But this just throws an exception of the form:
grouping expressions sequence is empty,
and '`memberId`' is not an aggregate function.
Wrap '(last(`items`, false) AS `item`)' in
windowing function(s) or wrap '`memberId`' in
first() (or first_value) if you don't care which value you get
I believe this happens because last
is an aggregation
function and requires me to .groupBy("memberId")
before I invoke last
.
How can I do this? Use of UDF
s is not encouraged when working with DataFrame
s but I can't find a native function that can do what I intend to do.
Upvotes: 1
Views: 4054
Reputation: 27373
You can do this using apply
method on Column
of type array, with which you can access array elements:
val newDf = df
.withColumn("lastItem", $"items"(size($"items")-1))
.drop("items")
Edit:
to get the first n-1 items, I would use an UDF :
val sliceUDF = udf((arr:Seq[Row],from:Int,to:Int) => arr.slice(from,to).map{case Row(ts:Long,Id:Int,w:Double) => (ts,Id,w)})
val newDf = df
.withColumn("subItems", sliceUDF($"items",lit(0),size($"items")-1))
.drop("items")
Maybe it could also be done using pure DataFrame API, but I think it would be rather complicated (e.g. using a combination of posexplode
, window-function and collect_list
)
Upvotes: 2