Reputation: 373
I have a Spark dataframe like this:
+-----+-----+-------+--------+
| A |B |2017Q1 | 2017Q2 |
+-----+----------------------+
| 1 | 101| 0.6 | 0.8 |
| 2 | 102| 0.7 | 0.9 |
| 3 | 103| 0.9 | 0.4 |
| ...| ...| ... | ... |
Here the year and quarter columns can be dynamic.
Meaning I might get columns for 2017Q3, 2017Q4 etc as well.
I want to convert the columns values for 2017Q1
and 2017Q2
into rows, like this:
+-----+-----+-------+--------+----+
| A |B |Year | Quarter|Val |
+-----+----------------------+----+
| 1 | 101| 2017 | 1 |0.6 |
| 1 | 101| 2017 | 2 |0.8 |
| 2 | 102| 2017 | 1 |0.7 |
| ...| ...| ... | ... |. |
Can somebody please help me with this. I am using Spark 2.4.4
Upvotes: 1
Views: 534
Reputation: 10406
In Spark SQL, you can create an array containing the values of both quarters. Since you need to remember which value corresponds to which quarter, you can create a struct to bind the index of the quarter with its value. To make it dynamic, you can use a list of the quarters. To make it even more generic, we can extract the quarters from the dataframe's column names as follows.
val quarters = df.columns
.filter( _.matches("[0-9]{4}Q[1-4]") ) // all the columns matching the regex
.sorted
df.withColumn("value", explode(array(
quarters.indices.map(i =>
struct(lit(i+1) as "val", col(quarters(i)) as "quarter")
) : _*
)))
.withColumn("Quarter", $"value.quarter")
.withColumn("Val", $"value.val")
.drop( quarters :+ "value" : _*)
EDIT Since it was asked in the comments, here is a (painful) Java version:
// a function to generate struct(lit(i+1) as "val", col(quarters(i)) as "quarter")
public static Column generateStruct(int i, List<String> quarters) {
Column[] columns = {
lit(i+1).alias("val"),
col(quarters.get(i)).alias("quarter")
};
Seq<Column> columnSeq = JavaConverters
.asScalaBufferConverter(Arrays.asList(columns))
.asScala().toSeq();
return struct(columnSeq);
}
List<String> quarters = Arrays.stream(df.columns())
.filter(x -> x.matches("[0-9]{4}Q[1-4]"))
.sorted().collect(Collectors.toList());
List<Column> quarterColumns = new ArrayList<>();
for(int i = 0; i < quarters.size(); i++)
quarterColumns.add(generateStruct(i, quarters));
df
.withColumn("value", explode(array(
JavaConverters.asScalaBufferConverter(quarterColumns).asScala().toSeq()
)))
.withColumn("Quarter", col("value.quarter"))
.withColumn("Val", col("value.val"))
.drop(JavaConverters.asScalaBufferConverter(quarters).asScala().toSeq())
.drop("value");
Upvotes: 2