sovan
sovan

Reputation: 373

Convert a Spark Dataframe Column to Rows

I have a Spark dataframe like this:

    +-----+-----+-------+--------+
    |  A  |B    |2017Q1 | 2017Q2 |
    +-----+----------------------+
    |  1  |  101|  0.6  |  0.8   |
    |  2  |  102|  0.7  |  0.9   |
    |  3  |  103|  0.9  |  0.4   |
    |  ...|  ...|  ...  |  ...   |

Here the year and quarter columns can be dynamic. Meaning I might get columns for 2017Q3, 2017Q4 etc as well. I want to convert the columns values for 2017Q1 and 2017Q2 into rows, like this:

+-----+-----+-------+--------+----+
|  A  |B    |Year   | Quarter|Val |
+-----+----------------------+----+
|  1  |  101| 2017  |  1     |0.6 |
|  1  |  101| 2017  |  2     |0.8 |
|  2  |  102| 2017  |  1     |0.7 |
|  ...|  ...|  ...  |  ...   |.   |

Can somebody please help me with this. I am using Spark 2.4.4

Upvotes: 1

Views: 534

Answers (1)

Oli
Oli

Reputation: 10406

In Spark SQL, you can create an array containing the values of both quarters. Since you need to remember which value corresponds to which quarter, you can create a struct to bind the index of the quarter with its value. To make it dynamic, you can use a list of the quarters. To make it even more generic, we can extract the quarters from the dataframe's column names as follows.

val quarters = df.columns
    .filter( _.matches("[0-9]{4}Q[1-4]") ) // all the columns matching the regex
    .sorted

df.withColumn("value", explode(array(
       quarters.indices.map(i =>
            struct(lit(i+1) as "val", col(quarters(i)) as "quarter")
       ) : _*
  )))
  .withColumn("Quarter", $"value.quarter")
  .withColumn("Val", $"value.val")
  .drop( quarters :+ "value" : _*)

EDIT Since it was asked in the comments, here is a (painful) Java version:

// a function to generate struct(lit(i+1) as "val", col(quarters(i)) as "quarter")
public static Column generateStruct(int i, List<String> quarters) {
    Column[] columns = {
        lit(i+1).alias("val"),
        col(quarters.get(i)).alias("quarter")
    };
    Seq<Column> columnSeq = JavaConverters
        .asScalaBufferConverter(Arrays.asList(columns))
        .asScala().toSeq();
    return struct(columnSeq);
}

List<String> quarters = Arrays.stream(df.columns())
    .filter(x -> x.matches("[0-9]{4}Q[1-4]"))
    .sorted().collect(Collectors.toList());

List<Column> quarterColumns = new ArrayList<>();
for(int i = 0; i < quarters.size(); i++)
    quarterColumns.add(generateStruct(i, quarters));


df
    .withColumn("value", explode(array(
        JavaConverters.asScalaBufferConverter(quarterColumns).asScala().toSeq()
    )))
    .withColumn("Quarter", col("value.quarter"))
    .withColumn("Val", col("value.val"))
    .drop(JavaConverters.asScalaBufferConverter(quarters).asScala().toSeq())
    .drop("value");

Upvotes: 2

Related Questions