Reputation: 11
How can I use Scala/Spark to find the latest/maximum value of string, which also contains integers?
I need to assign the latest device to the telephone numbers and I am using a max function for it, but there is a problem that this function is assigning it alphabetically, but I need it assigned according to the latest version of the device.
Here is an example:
+-------------------------------------------------+-------------+
|device |phone_number |
+-------------------------------------------------+-------------+
|device/ASUS_X00TD-10 Google/c34v0v005-34.0 |+7578869987 |
|device/ASUS_X00TD-9 Google/c32v0v015-32.0 |+7578869987 |
|device/ASUS_X00TD-6 Google/c06v0v003-6.0 |+7578869987 |
|device/Samsung/SM-A510F-SF3 Samsung-RCS/6.0 |+0988768769 |
|device/Samsung/SM-A530F-XXSACTB2 Samsung-RCS/6.0 |+0988768769 |
+-------------------------------------------------+-------------+
From the max function the latest device is the "device/ASUS_X00TD-9 Google/c32v0v015-32.0", but according to the version -34.0 the right answer is "device/ASUS_X00TD-10 Google/c34v0v005-34.0" As you can see, there are also values in other formats, which needs to be assigned too.
The desired output is:
+-------------------------------------------------+-------------+
|latest_device |phone_number |
+-------------------------------------------------+-------------+
|device/ASUS_X00TD-10 Google/c34v0v005-34.0 |+7578869987 |
|device/Samsung/SM-A530F-XXSACTB2 Samsung-RCS/6.0 |+0988768769 |
+-------------------------------------------------+-------------+
The important part of the code looks like this:
val dfRMT1 = df
.groupBy("phone_number")
.agg(max("device").alias("latest_device"))
Do you have any idea how can I do this in Scala/Spark?
Upvotes: 1
Views: 528
Reputation: 5078
As you discovered, you can't pass specific ordering to aggregation functions such as max
. You sould instead create a column in your dataframe where alphanumeric order returns the expected result.
In your example, the version number is actually the last characters of each device: 34.0
, 32.0
, 6.0
, etc... So you can extract this version number using the regexp \\d+\\.\\d+$
with regexp_extract, create a column with a struct with version number and device, get max
by this column and then get the device to get the latest device.
So you get the code below:
import org.apache.spark.sql.functions.{struct, regexp_extract, col, max}
import org.apache.spark.sql.types.FloatType
val dfRMT1 = df
.withColumn("device_with_version", struct(
regexp_extract(col("device"), "\\d+\\.\\d+$", 0).cast(FloatType).as("version"),
col("device"))
)
.groupBy("phone_number")
.agg(max("device_with_version").getField("device").alias("latest_device"))
We select the device with the greatest version. If two devices share the same version, we pick the first by alphanumeric order. In your example, it happens with the Samsung device.
Running this code with your example as input gives:
+------------+------------------------------------------------+
|phone_number|latest_device |
+------------+------------------------------------------------+
|+0988768769 |device/Samsung/SM-A530F-XXSACTB2 Samsung-RCS/6.0|
|+7578869987 |device/ASUS_X00TD-10 Google/c34v0v005-34.0 |
+------------+------------------------------------------------+
Upvotes: 1