Reputation: 57
Okay, I have a table with column definitions and corresponding ordinal positions. I'm building a metadata-driven ETL framework using Spark (scala). I have a table that contains the following info:
I have to build a CREATE TABLE statement from that data. Not a big deal, right? I tried what appears to be the standard answer:
var metadatadef = spark.sql("SELECT tablename, columnname, datatype, ordinalposition FROM metadata")
.withColumn("columndef", concat($"columnname", lit(" "), $"datatype"))
.sort($"tablename", $"ordinalposition")
.groupBy("tablename")
.agg(concat_ws(", ", collect_list($"columndef")).as("columndefs"))
But the sort() call seems to be ignored here. Or between collect_list() and concat_ws() it gets reshuffled. Given source data like this:
+-----------+--------------+---------------+-----------------+
| tablename | columnname | datatype | ordinalposition |
+ ----------+--------------+---------------+-----------------+
| table1 | IntColumn | int | 0 |
| table2 | StringColumn | string | 2 |
| table1 | StringColumn | string | 2 |
| table2 | IntColumn | int | 0 |
| table1 | DecColumn | decimal(15,2) | 1 |
| table2 | DecColumn | decimal(15,2) | 1 |
+-----------+--------------+---------------+-----------------+
I am trying to get output like this:
+-----------+----------------------------------------------------------------+
| tablename | columndefs |
+-----------+----------------------------------------------------------------+
| table1 | IntColumn int, DecColumn decimal(15,2), StringColumn string |
| table2 | IntColumn int, DecColumn decimal(15,2), StringColumn string |
+-----------+----------------------------------------------------------------+
Instead, I wind up with something like this:
+-----------+----------------------------------------------------------------+
| tablename | columndefs |
+-----------+----------------------------------------------------------------+
| table1 | IntColumn int, StringColumn string, DecColumn decimal(15,2) |
| table2 | StringColumn string, IntColumn int, DecColumn decimal(15,2) |
+-----------+----------------------------------------------------------------+
Do I need to build a UDF to ensure I get proper order? I need the output to end up in a dataframe for comparison purposes, not just building the CREATE TABLE statement.
Upvotes: 0
Views: 2185
Reputation: 22449
You can create a struct
column of (ordinalposition
, columndef
) and apply sort_array
to sort the aggregated columndef
in the wanted order during the groupBy
transformation as follows:
import org.apache.spark.sql.functions._
val df = Seq(
("table1", "IntColumn", "int", "0"),
("table2", "StringColumn", "string", "2"),
("table1", "StringColumn", "string", "2"),
("table2", "IntColumn", "int", "0"),
("table1", "DecColumn", "decimal(15,2)", "1"),
("table2", "DecColumn", "decimal(15,2)", "1")
).toDF("tablename", "columnname", "datatype", "ordinalposition")
df.
withColumn("columndef",
struct($"ordinalposition", concat($"columnname", lit(" "), $"datatype").as("cdef"))
).
groupBy("tablename").agg(sort_array(collect_list($"columndef")).as("sortedlist")).
withColumn("columndefs", concat_ws(", ", $"sortedlist.cdef")).
drop("sortedlist").
show(false)
// +---------+-----------------------------------------------------------+
// |tablename|columndefs |
// +---------+-----------------------------------------------------------+
// |table2 |IntColumn int, DecColumn decimal(15,2), StringColumn string|
// |table1 |IntColumn int, DecColumn decimal(15,2), StringColumn string|
// +---------+-----------------------------------------------------------+
Upvotes: 1