Reputation: 517
I have a Dataframe with one struct type column. Sample dataframe schema is:
root
|-- Data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- value: string (nullable = true)
Field name
holds column name and fields value
holds column value. Number of elements in Data
column is not defined so it can vary. I need to parse that data and get rid of nested structure. (Array Explode
will not work in this case because data in one row belongs to one element). The real schema is much bigger and has multiple array field like 'Data' so my aim is to create a general solution which I will be apply to apply on similar structure arrays.
Example:
Sample data:
val data = Seq(
"""{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName", "value": "Strong" }]}""",
"""{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName", "value": "Nesta " }]} { "name": "LName", "value": "Marley" }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))
Expected Result:
+-------+------+
| FName| LName|
+-------+------+
| Alex|Strong|
|Robert |Marley|
+-------+------+
As a solution I have create a UDF which I execute on whole Data
column. As input parameters I am passing column name and a field name which I want to extract.
val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
var value = ""
arr.foreach(el =>
if(el.getAs[String]("name") == columnName){
value = el.getAs[String]("value")
}
)
value
}}
The problem is that I am using variable value
for storing an intermediate result and I don't want to create a new a variable for each row on which my UDF will be executed.
The way how I am executing my UDF (That query generates an expected result):
df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()
I would be happy to hear any comments on how I can improve the UDF's logic and some different ways of how to solve the parsing issue.
Upvotes: 3
Views: 13496
Reputation: 517
I have solved the issue by substituting foreach
loop with find
method:
val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
arr.find(_.getAs[String]("name") == columnName) match {
case Some(i) => i.getAs[String]("value")
case None => null
}
}
Upvotes: 6
Reputation: 6323
Perhaps this is helpful-
val data = Seq(
"""{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName", "value": "Strong" }]}""",
"""{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName", "value": "Nesta " }, {
|"name": "LName", "value": "Marley" }]}""".stripMargin
)
val df = spark.read
.json(data.toDS())
df.show(false)
df.printSchema()
/**
* +----------------------------------------------------+
* |Data |
* +----------------------------------------------------+
* |[[FName, Alex], [LName, Strong]] |
* |[[FName, Robert ], [MName, Nesta ], [LName, Marley]]|
* +----------------------------------------------------+
*
* root
* |-- Data: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- name: string (nullable = true)
* | | |-- value: string (nullable = true)
*/
df.selectExpr("inline_outer(Data)")
.groupBy()
.pivot("name")
.agg(collect_list("value"))
.withColumn("x", arrays_zip($"FName", $"LName"))
.selectExpr("inline_outer(x)")
.show(false)
/**
* +-------+------+
* |FName |LName |
* +-------+------+
* |Alex |Strong|
* |Robert |Marley|
* +-------+------+
*/
Upvotes: 2