Reputation: 135
Hello guys i need some recomendations on this problem, i have this DataFrame:
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
|_id |h |inc|op |ts |webhooks__0__failed_at |webhooks__0__status|webhooks__0__updated_at|webhooks__1__failed_at |webhooks__1__updated_at|webhooks__2__failed_at |webhooks__2__updated_at|webhooks__3__failed_at|webhooks__3__updated_at|webhooks__5__failed_at|webhooks__5__updated_at|
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
|5926115bffecf947d9fdf965|-3783513890158363801|148|u |1564077339|null |null |null |2019-07-25 17:55:39.813|2019-07-25 17:55:39.819|null |null |null |null |null |null |
|5926115bffecf947d9fdf965|-6421919050082865687|151|u |1564077339|null |null |null |2019-07-25 17:55:39.822|2019-07-25 17:55:39.845|null |null |null |null |null |null |
|5926115bffecf947d9fdf965|-1953717027542703837|155|u |1564077339|null |null |null |2019-07-25 17:55:39.873|2019-07-25 17:55:39.878|null |null |null |null |null |null |
|5926115bffecf947d9fdf965|7260191374440479618 |159|u |1564077339|null |null |null |2019-07-25 17:55:39.945|2019-07-25 17:55:39.951|null |null |null |null |null |null |
|57d17de901cc6a6c9e0000ab|-2430099739381353477|131|u |1564077339|2019-07-25 17:55:39.722|error |2019-07-25 17:55:39.731|null |null |null |null |null |null |null |null |
|5b9bf21bffecf966c2878b11|4122669520839049341 |30 |u |1564077341|null |listening |2019-07-25 17:55:41.453|null |null |null |null |null |null |null |null |
|5b9bf21bffecf966c2878b11|4122669520839049341 |30 |u |1564077341|null |listening |2019-07-25 17:55:41.453|null |null |null |null |null |null |null |null |
|5b9bf21bffecf966c2878b11|-7191334145177061427|60 |u |1564077341|null |null |2019-07-25 17:55:41.768|null |null |null |null |null |null |null |null |
|5b9bf21bffecf966c2878b11|1897433358396319399 |58 |u |1564077341|null |null |2019-07-25 17:55:41.767|null |null |null |null |null |null |null |null |
|5b9bf21bffecf966c2878b11|1897433358396319399 |58 |u |1564077341|null |null |2019-07-25 17:55:41.767|null |null |null |null |null |null |null |null |
|58c6d048edbb6e09eb177639|8363076784039152000 |23 |u |1564077342|null |null |2019-07-25 17:55:42.216|null |null |null |null |null |null |null |null |
|5b9bf21bffecf966c2878b11|-7191334145177061427|60 |u |1564077341|null |null |2019-07-25 17:55:41.768|null |null |null |null |null |null |null |null |
|58c6d048edbb6e09eb177639|8363076784039152000 |23 |u |1564077342|null |null |2019-07-25 17:55:42.216|null |null |null |null |null |null |null |null |
|5ac6a0d3b795b013a5a73a43|-3790832816225805697|36 |u |1564077346|null |null |null |null |null |2019-07-25 17:55:46.384|2019-07-25 17:55:46.400|null |null |null |null |
|5ac6a0d3b795b013a5a73a43|-1747137668935062717|34 |u |1564077346|null |null |null |null |null |2019-07-25 17:55:46.385|2019-07-25 17:55:46.398|null |null |null |null |
|5ac6a0d3b795b013a5a73a43|-1747137668935062717|34 |u |1564077346|null |null |null |null |null |2019-07-25 17:55:46.385|2019-07-25 17:55:46.398|null |null |null |null |
|5ac6a0d3b795b013a5a73a43|-3790832816225805697|36 |u |1564077346|null |null |null |null |null |2019-07-25 17:55:46.384|2019-07-25 17:55:46.400|null |null |null |null |
|5ac6a0d3b795b013a5a73a43|6060575882395080442 |63 |u |1564077346|null |null |null |null |null |2019-07-25 17:55:46.506|2019-07-25 17:55:46.529|null |null |null |null |
|5ac6a0d3b795b013a5a73a43|6060575882395080442 |63 |u |1564077346|null |null |null |null |null |2019-07-25 17:55:46.506|2019-07-25 17:55:46.529|null |null |null |null |
|594e88f1ffecf918a14c143e|736029767610412482 |58 |u |1564077346|2019-07-25 17:55:46.503|null |2019-07-25 17:55:46.513|null |null |null |null |null |null |null |null |
+------------------------+--------------------+---+---+----------+-----------------------+-------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+----------------------+-----------------------+----------------------+-----------------------+
The column names are growing in a format like this
webhooks__0__failed_at,webhooks__0__failed_at
Would it be possible to make a new DataFrame taking the number of the column name as an Index and grouping the results like this?
Index | webhooks__failed_at | webhooks__status
0 | null | null
0 | null | null
0 | 2019-07-25 17:55:39.722 | error
Upvotes: 1
Views: 2263
Reputation: 6897
If your initial data frame is referenced as df
with the following schema:
df.printSchema
root
|-- _id: string (nullable = true)
|-- h: string (nullable = true)
|-- inc: string (nullable = true)
|-- op: string (nullable = true)
|-- ts: string (nullable = true)
|-- webhooks__0__failed_at: string (nullable = true)
|-- webhooks__0__status: string (nullable = true)
|-- webhooks__0__updated_at: string (nullable = true)
|-- webhooks__1__failed_at: string (nullable = true)
|-- webhooks__1__updated_at: string (nullable = true)
|-- webhooks__2__failed_at: string (nullable = true)
|-- webhooks__2__updated_at: string (nullable = true)
|-- webhooks__3__failed_at: string (nullable = true)
|-- webhooks__3__updated_at: string (nullable = true)
|-- webhooks__5__failed_at: string (nullable = true)
|-- webhooks__5__updated_at: string (nullable = true)
You can regroup all webhook data in an array of struct simply by manipulating column name expressions and you can use the lit
spark function to insert column names as values in the resulting dataset.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import df.sparkSession.implicits._
val (webhooks_columns, base_columns) = df.columns.partition(_.startsWith("webhooks"))
val parsed_webhooks_columns = webhooks_columns
.map(_.split("__"))
.map { case Array(_: String, idx: String, f: String) => (idx, f) }
val all_fields = parsed_webhooks_columns.map(_._2).toSet
val webhooks_structs = parsed_webhooks_columns
.groupBy(_._1)
.map(t => {
val fields = t._2.map(_._2)
val all_struct_fields =
Seq(lit(t._1).as("index")) ++
all_fields.map { f =>
if (fields.contains(f))
col(s"webhooks__${t._1}__${f}").as(f)
else
lit(null).cast(StringType).as(f)
}
struct(all_struct_fields:_*)
}).toArray
val df_step1 = df.select(base_columns.map(col) ++
Seq(array(webhooks_structs:_*).as("webhooks")):_*)
Most of the complexity in the code above deals with the fact that you have varying number of fields depending on the webhook index (index 0 has a status field not found in the other indexes) and you need to ensure all structs have exactly the same columns with the same types and in the same order for the transformation to work.
You end up with the following schema:
df_step1.printSchema
root
|-- _id: string (nullable = true)
|-- h: string (nullable = true)
|-- inc: string (nullable = true)
|-- op: string (nullable = true)
|-- ts: string (nullable = true)
|-- webhooks: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- index: string (nullable = false)
| | |-- failed_at: string (nullable = true)
| | |-- status: string (nullable = true)
| | |-- updated_at: string (nullable = true)
Now you can explode the dataset to split the different webhooks in separate rows
val df_step2 = df_step1.withColumn("webhook", explode('webhooks)).drop("webhooks")
And you get the following schema
df_step2.printSchema
root
|-- _id: string (nullable = true)
|-- h: string (nullable = true)
|-- inc: string (nullable = true)
|-- op: string (nullable = true)
|-- ts: string (nullable = true)
|-- webhook: struct (nullable = false)
| |-- index: string (nullable = false)
| |-- failed_at: string (nullable = true)
| |-- status: string (nullable = true)
| |-- updated_at: string (nullable = true)
You can then optionally flatten the dataset to simplify final schema
val df_step2_flattened = df_step2.schema
.filter(_.name == "webhook")
.flatMap(_.dataType.asInstanceOf[StructType])
.map(f => (s"webhook_${f.name}", 'webhook(f.name)))
.foldLeft(df_step2) { case (df, (colname, colspec)) => df.withColumn(colname, colspec) }
.drop("webhook")
At this point you'll probably want to filter out rows with null webhook_updated_at and run whatever aggregation you need.
Your final schema is now:
df_step2_flattened.printSchema
root
|-- _id: string (nullable = true)
|-- h: string (nullable = true)
|-- inc: string (nullable = true)
|-- op: string (nullable = true)
|-- ts: string (nullable = true)
|-- webhook_index: string (nullable = false)
|-- webhook_failed_at: string (nullable = true)
|-- webhook_status: string (nullable = true)
|-- webhook_updated_at: string (nullable = true)
This is not the only way to do what you want but the key benefit of the above approach is that it's only using built-in Spark expressions and functions and thus can fully leverage all the catalyst engine optimizations.
Upvotes: 4
Reputation: 512
I would advise looping. The example below is basic, but could help point you in the write direction. The example aims at searching for one column instead of two, however it can be built to factor in for multiple different columns and built into a sub-process if need be.
//Build the DataFrame
val inputDF = spark.sql("select 'a' as Column_1, 'value_1' as test_0_value, 'value_2' as test_1_value, 'value_3' as test_2_value, 'value_4' as test_3_value")
//Make my TempDFs
var interimDF = spark.sql("select 'at-at' as column_1")
var actionDF = interimDF
var finalDF = interimDF
//This would be your search and replacement characteristics
val lookForValue = "test"
val replacementName = "test_check"
//Holds the constants
var constantArray = Array("Column_1")
//Based on above makes an array based on the columns you need to hit
var changeArray = Seq(inputDF.columns:_*).toDF("Columns").where("Columns rlike '" + lookForValue + "'").rdd.map(x=>x.mkString).collect
//Iterator
var iterator = 1
//Need this for below to run commands
var runStatement = Array("")
//Runs until all columns are hit
while(iterator <= changeArray.length) {
//Adds constants
runStatement = constantArray
//Adds the current iteration columns
runStatement = runStatement ++ Array(changeArray(iterator - 1) + " as " + replacementName)
//Adds the iteration event
runStatement = runStatement ++ Array("'" + iterator + "' as Iteration_Number")
//Runs all the prebuilt commands
actionDF = inputDF.selectExpr(runStatement:_*)
//The reason for this is going from input -> action -> interim <-> final allows for interim and final to be semi-dynamic and allows vertical and horizontal catalogue keeping in spark
interimDF = if(iterator == 1) {
actionDF
} else {
finalDF.unionAll(actionDF)
}
finalDF = interimDF
iterator = iterator + 1
}
Upvotes: 1