Rachana Gandhi
Rachana Gandhi

Reputation: 66

How do you split a column such that first half becomes the column name and the second the column value in Scala Spark?

I have a column which has value like

+----------------------+-----------------------------------------+
|UserId                |col                                      |
+----------------------+-----------------------------------------+
|1                     |firstname=abc                            |
|2                     |lastname=xyz                             |
|3                     |firstname=pqr;lastname=zzz               |
|4                     |firstname=aaa;middlename=xxx;lastname=bbb|
+----------------------+-----------------------------------------+

and what I want is something like this:

+----------------------+--------------------------------+
|UserId                |firstname | lastname| middlename|
+----------------------+--------------------------------+
|1                     |abc       | null    | null      |
|2                     |null      | xyz     | null      |
|3                     |pqr       | zzz     | null      |
|4                     |aaa       | bbb     | xxx       |
+----------------------+--------------------------------+

I have already done this:

var new_df = df.withColumn("temp_new", split(col("col"), "\\;")).select(
     (0 until numCols).map(i => split(col("temp_new").getItem(i), "=").getItem(1).as(s"col$i")): _*
)

where numCols is the max length of col

but as you may have guessed I get something like this as the output:

+----------------------+--------------------------------+
|UserId                |col0      | col1    | col2      |
+----------------------+--------------------------------+
|1                     |abc       | null    | null      |
|2                     |xyz       | null    | null      |
|3                     |pqr       | zzz     | null      |
|4                     |aaa       | xxx     | bbb       |
+----------------------+--------------------------------+

NOTE: The above is just an example. There could be more additions to the columns like firstname=aaa;middlename=xxx;lastname=bbb;age=20;country=India and so on for around 40-50 columnnames and values. They are dynamic and I don't know most of them in advance

I am looking for a a way to achieve the result with Scala in Spark.

Upvotes: 1

Views: 182

Answers (3)

Leo C
Leo C

Reputation: 22439

This solution is proposed in accordance with the expanded requirement described in the other answer's comments section:

  1. Existence of duplicate keys in column key_values
  2. Only duplicate key columns will be aggregated as ArrayType

There are probably other approaches. The solution below uses groupBy/pivot with collect_list, followed by extracting the single element (null if empty) from the non-duplicate key columns.

val df = Seq(
  (1, "firstname=joe;age=33;moviegenre=comedy"),
  (2, "lastname=smith;country=usa;moviegenre=drama"),
  (3, "firstname=zoe;lastname=cooper;age=44;country=aus"),
  (4, "firstname=john;lastname=doe;moviegenre=drama;moviegenre=comedy")
).toDF("user_id", "key_values")

val mainCols = df.columns diff Seq("key_values")

val dfNew = df.
  withColumn("kv_arr", split($"key_values", ";")).
  withColumn("kv", explode(expr("transform(kv_arr, kv -> split(kv, '='))"))).
  groupBy("user_id").pivot($"kv"(0)).agg(collect_list($"kv"(1)))

val dupeKeys = Seq("moviegenre")  // user-provided
val nonDupeKeys = dfNew.columns diff (mainCols ++ dupeKeys)

dfNew.select(
    mainCols.map(col) ++
    dupeKeys.map(col) ++
    nonDupeKeys.map(k => when(size(col(k)) > 0, col(k)(0)).as(k)): _*
  ).
  orderBy("user_id").  // only for ordered output
  show
/*
+-------+---------------+----+-------+---------+--------+
|user_id|     moviegenre| age|country|firstname|lastname|
+-------+---------------+----+-------+---------+--------+
|      1|       [comedy]|  33|   null|      joe|    null|
|      2|        [drama]|null|    usa|     null|   smith|
|      3|             []|  44|    aus|      zoe|  cooper|
|      4|[drama, comedy]|null|   null|     john|     doe|
+-------+---------------+----+-------+---------+--------+
/*

Note that higher-order function transform is used to handle the key/value split, as SQL function str_to_map (used in the original solution) can't handle duplicate keys.

Upvotes: 0

Leo C
Leo C

Reputation: 22439

You could apply groupBy/pivot to generate key columns after converting the key/value-pairs string column into a Map column via SQL function str_to_map, as shown below:

val df = Seq(
  (1, "firstname=joe;age=33"),
  (2, "lastname=smith;country=usa"),
  (3, "firstname=zoe;lastname=cooper;age=44;country=aus"),
  (4, "firstname=john;lastname=doe")
).toDF("user_id", "key_values")

df.
  select($"user_id", explode(expr("str_to_map(key_values, ';', '=')"))).
  groupBy("user_id").pivot("key").agg(first("value").as("value")).
  orderBy("user_id").  // only for ordered output
  show
/*
+-------+----+-------+---------+--------+
|user_id| age|country|firstname|lastname|
+-------+----+-------+---------+--------+
|      1|  33|   null|      joe|    null|
|      2|null|    usa|     null|   smith|
|      3|  44|    aus|      zoe|  cooper|
|      4|null|   null|     john|     doe|
+-------+----+-------+---------+--------+
*/

Upvotes: 3

ggordon
ggordon

Reputation: 10035

Since your data is split by ; then your key value pairs are split by = you may consider using str_to_map the following:

  1. creating a temporary view of your data eg
df.createOrReplaceTempView("my_table")
  1. Running the following on your spark session
result_df = sparkSession.sql("<insert sql below here>")
WITH split_data AS (
    SELECT
        UserId,
        str_to_map(col,';','=') full_name
    FROM
        my_table
)
SELECT
    UserId,
    full_name['firstname'] as firstname,
    full_name['lastname'] as lastname,
    full_name['middlename'] as middlename
FROM
    split_data

Upvotes: 0

Related Questions