Steve Cardella
Steve Cardella

Reputation: 57

Spark Scala - concatenate column in rows in a certain order

Okay, I have a table with column definitions and corresponding ordinal positions. I'm building a metadata-driven ETL framework using Spark (scala). I have a table that contains the following info:

I have to build a CREATE TABLE statement from that data. Not a big deal, right? I tried what appears to be the standard answer:

var metadatadef = spark.sql("SELECT tablename, columnname, datatype, ordinalposition FROM metadata")
    .withColumn("columndef", concat($"columnname", lit(" "), $"datatype"))
    .sort($"tablename", $"ordinalposition")
    .groupBy("tablename")
    .agg(concat_ws(", ", collect_list($"columndef")).as("columndefs"))

But the sort() call seems to be ignored here. Or between collect_list() and concat_ws() it gets reshuffled. Given source data like this:

+-----------+--------------+---------------+-----------------+
| tablename | columnname   | datatype      | ordinalposition |
+ ----------+--------------+---------------+-----------------+
| table1    | IntColumn    | int           | 0               |
| table2    | StringColumn | string        | 2               |
| table1    | StringColumn | string        | 2               |
| table2    | IntColumn    | int           | 0               |
| table1    | DecColumn    | decimal(15,2) | 1               |
| table2    | DecColumn    | decimal(15,2) | 1               |
+-----------+--------------+---------------+-----------------+

I am trying to get output like this:

+-----------+----------------------------------------------------------------+
| tablename | columndefs                                                     |
+-----------+----------------------------------------------------------------+
| table1    | IntColumn int, DecColumn decimal(15,2), StringColumn string    |
| table2    | IntColumn int, DecColumn decimal(15,2), StringColumn string    |
+-----------+----------------------------------------------------------------+

Instead, I wind up with something like this:

+-----------+----------------------------------------------------------------+
| tablename | columndefs                                                     |
+-----------+----------------------------------------------------------------+
| table1    | IntColumn int, StringColumn string, DecColumn decimal(15,2)    |
| table2    | StringColumn string, IntColumn int, DecColumn decimal(15,2)    |
+-----------+----------------------------------------------------------------+

Do I need to build a UDF to ensure I get proper order? I need the output to end up in a dataframe for comparison purposes, not just building the CREATE TABLE statement.

Upvotes: 0

Views: 2185

Answers (1)

Leo C
Leo C

Reputation: 22449

You can create a struct column of (ordinalposition, columndef) and apply sort_array to sort the aggregated columndef in the wanted order during the groupBy transformation as follows:

import org.apache.spark.sql.functions._

val df = Seq(
  ("table1", "IntColumn", "int", "0"),
  ("table2", "StringColumn", "string", "2"),
  ("table1", "StringColumn", "string", "2"),
  ("table2", "IntColumn", "int", "0"),
  ("table1", "DecColumn", "decimal(15,2)", "1"),
  ("table2", "DecColumn", "decimal(15,2)", "1")
).toDF("tablename", "columnname", "datatype", "ordinalposition")

df.
  withColumn("columndef",
    struct($"ordinalposition", concat($"columnname", lit(" "), $"datatype").as("cdef"))
  ).
  groupBy("tablename").agg(sort_array(collect_list($"columndef")).as("sortedlist")).
  withColumn("columndefs", concat_ws(", ", $"sortedlist.cdef")).
  drop("sortedlist").
  show(false)
// +---------+-----------------------------------------------------------+
// |tablename|columndefs                                                 |
// +---------+-----------------------------------------------------------+
// |table2   |IntColumn int, DecColumn decimal(15,2), StringColumn string|
// |table1   |IntColumn int, DecColumn decimal(15,2), StringColumn string|
// +---------+-----------------------------------------------------------+

Upvotes: 1

Related Questions