Aleksejs R
Aleksejs R

Reputation: 517

Array of struct parsing in Spark dataframe

I have a Dataframe with one struct type column. Sample dataframe schema is:

root
 |-- Data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)

Field name holds column name and fields value holds column value. Number of elements in Data column is not defined so it can vary. I need to parse that data and get rid of nested structure. (Array Explode will not work in this case because data in one row belongs to one element). The real schema is much bigger and has multiple array field like 'Data' so my aim is to create a general solution which I will be apply to apply on similar structure arrays. Example:

Sample data:

val data = Seq(
    """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
    """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }]} { "name": "LName",   "value": "Marley"  }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))

Expected Result:

+-------+------+
|  FName| LName|
+-------+------+
|   Alex|Strong|
|Robert |Marley|
+-------+------+
 

As a solution I have create a UDF which I execute on whole Data column. As input parameters I am passing column name and a field name which I want to extract.

 val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
    var value = ""
    arr.foreach(el =>
        if(el.getAs[String]("name") == columnName){
            value = el.getAs[String]("value")
        }
    )
    value
}}

The problem is that I am using variable value for storing an intermediate result and I don't want to create a new a variable for each row on which my UDF will be executed.

The way how I am executing my UDF (That query generates an expected result):

df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()

I would be happy to hear any comments on how I can improve the UDF's logic and some different ways of how to solve the parsing issue.

Upvotes: 3

Views: 13496

Answers (2)

Aleksejs R
Aleksejs R

Reputation: 517

I have solved the issue by substituting foreach loop with find method:

val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
    arr.find(_.getAs[String]("name") == columnName) match {
        case Some(i) => i.getAs[String]("value")
        case None => null
    }
}

Upvotes: 6

Som
Som

Reputation: 6323

Perhaps this is helpful-

  val data = Seq(
      """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
      """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }, {
        |"name": "LName",   "value": "Marley"  }]}""".stripMargin
    )
    val df = spark.read
      .json(data.toDS())
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------------------------------+
      * |Data                                                |
      * +----------------------------------------------------+
      * |[[FName, Alex], [LName, Strong]]                    |
      * |[[FName, Robert ], [MName, Nesta ], [LName, Marley]]|
      * +----------------------------------------------------+
      *
      * root
      * |-- Data: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- name: string (nullable = true)
      * |    |    |-- value: string (nullable = true)
      */

    df.selectExpr("inline_outer(Data)")
      .groupBy()
      .pivot("name")
      .agg(collect_list("value"))
      .withColumn("x", arrays_zip($"FName", $"LName"))
      .selectExpr("inline_outer(x)")
      .show(false)

    /**
      * +-------+------+
      * |FName  |LName |
      * +-------+------+
      * |Alex   |Strong|
      * |Robert |Marley|
      * +-------+------+
      */

Upvotes: 2

Related Questions