Jesus Zuñiga
Jesus Zuñiga

Reputation: 135

How could i convert a DataFrame Column name into a value in Spark-Scala

Hello guys i need some recomendations on this problem, i have this DataFrame:

+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
|_id                     |h                   |inc|op |ts        |webhooks__0__failed_at |webhooks__0__status|webhooks__0__updated_at|webhooks__1__failed_at |webhooks__1__updated_at|webhooks__2__failed_at |webhooks__2__updated_at|webhooks__3__failed_at|webhooks__3__updated_at|webhooks__5__failed_at|webhooks__5__updated_at|
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
|5926115bffecf947d9fdf965|-3783513890158363801|148|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.813|2019-07-25 17:55:39.819|null                   |null                   |null                  |null                   |null                  |null                   |
|5926115bffecf947d9fdf965|-6421919050082865687|151|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.822|2019-07-25 17:55:39.845|null                   |null                   |null                  |null                   |null                  |null                   |
|5926115bffecf947d9fdf965|-1953717027542703837|155|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.873|2019-07-25 17:55:39.878|null                   |null                   |null                  |null                   |null                  |null                   |
|5926115bffecf947d9fdf965|7260191374440479618 |159|u  |1564077339|null                   |null               |null                   |2019-07-25 17:55:39.945|2019-07-25 17:55:39.951|null                   |null                   |null                  |null                   |null                  |null                   |
|57d17de901cc6a6c9e0000ab|-2430099739381353477|131|u  |1564077339|2019-07-25 17:55:39.722|error              |2019-07-25 17:55:39.731|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|4122669520839049341 |30 |u  |1564077341|null                   |listening          |2019-07-25 17:55:41.453|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|4122669520839049341 |30 |u  |1564077341|null                   |listening          |2019-07-25 17:55:41.453|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|-7191334145177061427|60 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.768|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|1897433358396319399 |58 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.767|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|1897433358396319399 |58 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.767|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|58c6d048edbb6e09eb177639|8363076784039152000 |23 |u  |1564077342|null                   |null               |2019-07-25 17:55:42.216|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5b9bf21bffecf966c2878b11|-7191334145177061427|60 |u  |1564077341|null                   |null               |2019-07-25 17:55:41.768|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|58c6d048edbb6e09eb177639|8363076784039152000 |23 |u  |1564077342|null                   |null               |2019-07-25 17:55:42.216|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-3790832816225805697|36 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.384|2019-07-25 17:55:46.400|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-1747137668935062717|34 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.385|2019-07-25 17:55:46.398|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-1747137668935062717|34 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.385|2019-07-25 17:55:46.398|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|-3790832816225805697|36 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.384|2019-07-25 17:55:46.400|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|6060575882395080442 |63 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.506|2019-07-25 17:55:46.529|null                  |null                   |null                  |null                   |
|5ac6a0d3b795b013a5a73a43|6060575882395080442 |63 |u  |1564077346|null                   |null               |null                   |null                   |null                   |2019-07-25 17:55:46.506|2019-07-25 17:55:46.529|null                  |null                   |null                  |null                   |
|594e88f1ffecf918a14c143e|736029767610412482  |58 |u  |1564077346|2019-07-25 17:55:46.503|null               |2019-07-25 17:55:46.513|null                   |null                   |null                   |null                   |null                  |null                   |null                  |null                   |
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+

The column names are growing in a format like this

webhooks__0__failed_at,webhooks__0__failed_at

Would it be possible to make a new DataFrame taking the number of the column name as an Index and grouping the results like this?

Index | webhooks__failed_at         |  webhooks__status

0     |     null                    |      null

0     |     null                    |      null

0     |    2019-07-25 17:55:39.722  |     error

Upvotes: 1

Views: 2263

Answers (2)

rluta
rluta

Reputation: 6897

If your initial data frame is referenced as df with the following schema:

df.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhooks__0__failed_at: string (nullable = true)
 |-- webhooks__0__status: string (nullable = true)
 |-- webhooks__0__updated_at: string (nullable = true)
 |-- webhooks__1__failed_at: string (nullable = true)
 |-- webhooks__1__updated_at: string (nullable = true)
 |-- webhooks__2__failed_at: string (nullable = true)
 |-- webhooks__2__updated_at: string (nullable = true)
 |-- webhooks__3__failed_at: string (nullable = true)
 |-- webhooks__3__updated_at: string (nullable = true)
 |-- webhooks__5__failed_at: string (nullable = true)
 |-- webhooks__5__updated_at: string (nullable = true)

You can regroup all webhook data in an array of struct simply by manipulating column name expressions and you can use the lit spark function to insert column names as values in the resulting dataset.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import df.sparkSession.implicits._

val (webhooks_columns, base_columns) = df.columns.partition(_.startsWith("webhooks"))

val parsed_webhooks_columns = webhooks_columns
     .map(_.split("__"))
     .map { case Array(_: String, idx: String, f: String) => (idx, f) }

val all_fields = parsed_webhooks_columns.map(_._2).toSet

val webhooks_structs = parsed_webhooks_columns
    .groupBy(_._1)
    .map(t => {
      val fields = t._2.map(_._2)
      val all_struct_fields = 
          Seq(lit(t._1).as("index")) ++ 
          all_fields.map { f =>
            if (fields.contains(f))
                col(s"webhooks__${t._1}__${f}").as(f)
            else
                lit(null).cast(StringType).as(f)
          }
      struct(all_struct_fields:_*)
    }).toArray


val df_step1 = df.select(base_columns.map(col) ++
    Seq(array(webhooks_structs:_*).as("webhooks")):_*)

Most of the complexity in the code above deals with the fact that you have varying number of fields depending on the webhook index (index 0 has a status field not found in the other indexes) and you need to ensure all structs have exactly the same columns with the same types and in the same order for the transformation to work.

You end up with the following schema:

df_step1.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhooks: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- index: string (nullable = false)
 |    |    |-- failed_at: string (nullable = true)
 |    |    |-- status: string (nullable = true)
 |    |    |-- updated_at: string (nullable = true)

Now you can explode the dataset to split the different webhooks in separate rows

val df_step2 = df_step1.withColumn("webhook", explode('webhooks)).drop("webhooks")

And you get the following schema

df_step2.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhook: struct (nullable = false)
 |    |-- index: string (nullable = false)
 |    |-- failed_at: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- updated_at: string (nullable = true)

You can then optionally flatten the dataset to simplify final schema

val df_step2_flattened = df_step2.schema
       .filter(_.name == "webhook")
       .flatMap(_.dataType.asInstanceOf[StructType])
       .map(f => (s"webhook_${f.name}", 'webhook(f.name)))
       .foldLeft(df_step2) { case (df, (colname, colspec)) => df.withColumn(colname, colspec) }
       .drop("webhook")

At this point you'll probably want to filter out rows with null webhook_updated_at and run whatever aggregation you need.

Your final schema is now:

df_step2_flattened.printSchema
root
 |-- _id: string (nullable = true)
 |-- h: string (nullable = true)
 |-- inc: string (nullable = true)
 |-- op: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- webhook_index: string (nullable = false)
 |-- webhook_failed_at: string (nullable = true)
 |-- webhook_status: string (nullable = true)
 |-- webhook_updated_at: string (nullable = true)

This is not the only way to do what you want but the key benefit of the above approach is that it's only using built-in Spark expressions and functions and thus can fully leverage all the catalyst engine optimizations.

Upvotes: 4

afeldman
afeldman

Reputation: 512

I would advise looping. The example below is basic, but could help point you in the write direction. The example aims at searching for one column instead of two, however it can be built to factor in for multiple different columns and built into a sub-process if need be.

//Build the DataFrame
val inputDF = spark.sql("select 'a' as Column_1, 'value_1' as test_0_value, 'value_2' as test_1_value, 'value_3' as test_2_value, 'value_4' as test_3_value")

//Make my TempDFs
var interimDF = spark.sql("select 'at-at' as column_1")
var actionDF = interimDF
var finalDF = interimDF

//This would be your search and replacement characteristics
val lookForValue = "test"
val replacementName = "test_check"

//Holds the constants
var constantArray = Array("Column_1")
//Based on above makes an array based on the columns you need to hit
var changeArray = Seq(inputDF.columns:_*).toDF("Columns").where("Columns rlike '" + lookForValue + "'").rdd.map(x=>x.mkString).collect

//Iterator
var iterator = 1

//Need this for below to run commands
var runStatement = Array("")

//Runs until all columns are hit
while(iterator <= changeArray.length) {
  //Adds constants
  runStatement = constantArray
  //Adds the current iteration columns
  runStatement = runStatement ++ Array(changeArray(iterator - 1) + " as " + replacementName)
  //Adds the iteration event
  runStatement = runStatement ++ Array("'" + iterator + "' as Iteration_Number")

  //Runs all the prebuilt commands
  actionDF = inputDF.selectExpr(runStatement:_*)

  //The reason for this is going from input -> action -> interim <-> final allows for interim and final to be semi-dynamic and allows vertical and horizontal catalogue keeping in spark
  interimDF = if(iterator == 1) {
    actionDF
  } else {
    finalDF.unionAll(actionDF)
  }
  finalDF = interimDF
  iterator = iterator + 1
}

Upvotes: 1

Related Questions