Nik
Nik

Reputation: 5745

Get last element of list in Spark Dataframe column

I have a DataFrame that has the following schema.

root
 |-- memberId: long (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- timestamp: long (nullable = true)
 |    |    |-- itemId: integer (nullable = true)
 |    |    |-- weight: double (nullable = true)

Say, the DataFrame (called df) looks like this.

+-----------+------------------------------------------------------------------------+
|memberId   |items                                                                   |
+-----------+------------------------------------------------------------------------+
|10000000001|[[1234567891, 104, 1.0], [1234567892, 103, 3.0]]                        |
|10000000002|[[1234567891, 103, 1.0], [1234567893, 102, 1.0], [1234567894, 101, 2.0]]|
+-----------+------------------------------------------------------------------------+

As can be seen, the df is a map of memberId to a list of structs. I want to transform it such that I retrieve the last element in the list of structs corresponding to each member. So, the resulting DataFrame should look like

+-----------+----------------------+
|memberId   |lastItem              |
+-----------+----------------------+
|10000000001|[1234567892, 103, 3.0]|
|10000000002|[1234567894, 101, 2.0]|
+-----------+----------------------+

I tried this

val newDf = df
  .withColumn("lastItem", last($"items"))
  .drop("items")

But this just throws an exception of the form:

grouping expressions sequence is empty, 
and '`memberId`' is not an aggregate function. 
Wrap '(last(`items`, false) AS `item`)' in 
windowing function(s) or wrap '`memberId`' in 
first() (or first_value) if you don't care which value you get

I believe this happens because last is an aggregation function and requires me to .groupBy("memberId") before I invoke last.

How can I do this? Use of UDFs is not encouraged when working with DataFrames but I can't find a native function that can do what I intend to do.

Upvotes: 1

Views: 4054

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

You can do this using apply method on Column of type array, with which you can access array elements:

val newDf = df
.withColumn("lastItem", $"items"(size($"items")-1))
.drop("items")

Edit:

to get the first n-1 items, I would use an UDF :

val sliceUDF = udf((arr:Seq[Row],from:Int,to:Int) => arr.slice(from,to).map{case Row(ts:Long,Id:Int,w:Double) => (ts,Id,w)})

val newDf = df
  .withColumn("subItems", sliceUDF($"items",lit(0),size($"items")-1))
  .drop("items")

Maybe it could also be done using pure DataFrame API, but I think it would be rather complicated (e.g. using a combination of posexplode, window-function and collect_list)

Upvotes: 2

Related Questions