Harshvardhan Solanki
Harshvardhan Solanki

Reputation: 677

How to select Keys from Json Object{} (complex data type) DataFrame in Spark

I have a json string as :

{"user_rating": {"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}}

I wish to create a DataFrame out of it with DataFrame columns as:

rating_text | rating_color | votes | aggregate_rating

When I code it as :

val pdf = json.select("user_rating")

I get only one column user_rating

I approached this most voted solution but still getting only user_rating column : pdf.show()

Not sure how the Solution1 works exactly.

Solution 1 Solution 2

Unable to access columns by index as provided in Solution2. Getting NoSuchColumn column error.


What is the best possible approach to extract keys(rating_text,rating_color,..) and use as columns in a DataFrame?

Language I am using : Scala


Tried the below way to Iterate over each Row in DataFrame and parse by getting columns:

val pdf = json.select("restaurants.restaurant.user_rating")
pdf.map{Rrowow => (row.getStruct(0).getString(0))}.show()

Getting below exception at map function:

 java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row

Upvotes: 3

Views: 8018

Answers (3)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237


Any level of nested data can be un nested and create flat data frame using this kind of approach.


Logic is as below

  1. Understand the nesting level with either array or struct types.
  2. Loop throuh the nesting level and flatten using the below way.
import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types.{ArrayType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/** *
   * expand_nested_column : 
   * @param json_data_df_temp
   * @return [[DataFrame]]
   */
  def expand_nested_column(json_data_df_temp: DataFrame): DataFrame = {
    var json_data_df: DataFrame = json_data_df_temp
    var select_clause_list = List.empty[String]

    // Iterating each columns again to check if any next json data is exists
    for (column_name <- json_data_df.schema.names) {
      println("Outside isinstance loop: " + column_name)

      // Checking column type is ArrayType
      if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]) {
        println("Inside isinstance loop: " + column_name)

        //Extracting nested json columns/data using explode function
        json_data_df = json_data_df.withColumn(column_name, explode(json_data_df(column_name)).alias(column_name))
        select_clause_list :+= column_name
      }
      else if (json_data_df.schema(column_name).dataType.isInstanceOf[StructType]) {
        println("Inside isinstance loop of StructType: " + column_name)
        for (field <- json_data_df.schema(column_name).dataType.asInstanceOf[StructType].fields) {
          select_clause_list :+= column_name + "." + field.name
        }
      }
      else {
        select_clause_list :+= column_name
      }
    }

    val columnNames = select_clause_list.map(name => col(name).alias(name.replace('.', '_')))

    // Selecting columns using select_clause_list from dataframe: json_data_df
    json_data_df.select(columnNames: _*)
  }



var json_data_df = spark.read.json(
      "{\"user_rating\": {\"rating_text\": \"Excellent\", \"rating_color\": \"3F7E00\", \"votes\": \"778\", \"aggregate_rating\": \"4.5\"}}".lines.toList.toDS()
    )
    json_data_df.show(10, false)
    json_data_df.printSchema()

    // Process the Nested Structure
    var nested_column_count = 1
    // Run the while loop until the nested_column_count is zero(0)
    while (nested_column_count != 0) {
      println("Printing nested_column_count: " + nested_column_count)

      var nested_column_count_temp = 0
      // Iterating each columns again to check if any next json data is exists

      for (column_name <- json_data_df.schema.names) {
        print("Iterating DataFrame Columns: " + column_name)
        // Checking column type is ArrayType
        if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]
          || json_data_df.schema(column_name).dataType.isInstanceOf[StructType]) {
          nested_column_count_temp += 1
        }
      }
      if (nested_column_count_temp != 0) {
        json_data_df = expand_nested_column(json_data_df)
        json_data_df.show(100, false)
      }
      print("Printing nested_column_count_temp: " + nested_column_count_temp)
      nested_column_count = nested_column_count_temp
    }

    json_data_df.show(100, false)
    json_data_df.printSchema()

Result :

+-----------------------------+
|user_rating                  |
+-----------------------------+
|[4.5, 3F7E00, Excellent, 778]|
+-----------------------------+

root
 |-- user_rating: struct (nullable = true)
 |    |-- aggregate_rating: string (nullable = true)
 |    |-- rating_color: string (nullable = true)
 |    |-- rating_text: string (nullable = true)
 |    |-- votes: string (nullable = true)

Printing nested_column_count: 1
Iterating DataFrame Columns: user_ratingOutside isinstance loop: user_rating
Inside isinstance loop of StructType: user_rating
+----------------------------+------------------------+-----------------------+-----------------+
|user_rating_aggregate_rating|user_rating_rating_color|user_rating_rating_text|user_rating_votes|
+----------------------------+------------------------+-----------------------+-----------------+
|4.5                         |3F7E00                  |Excellent              |778              |
+----------------------------+------------------------+-----------------------+-----------------+

Upvotes: 0

Harshvardhan Solanki
Harshvardhan Solanki

Reputation: 677

I got the work around. Basically what I was looking for was explode function. Which returns a Row for each element in the column.

Upvotes: 1

Salim
Salim

Reputation: 2178

You can parse a column containing JSON String and build a dataframe containing all columns within the JSON. Here is an example -

    val jsonData = """{"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}"""

    val schema = {StructType(
      List(
        StructField("rating_text", StringType, nullable = false),
        StructField("rating_color", StringType, nullable = false),
        StructField("votes", StringType, nullable = false),
        StructField("aggregate_rating", StringType, nullable = false)
      ))}

    val df = spark.createDataset(Seq(jsonData)).toDF("user_rating")
    val dfWithParsedJson = df.withColumn("json_data",from_json($"user_rating",schema))

    dfWithParsedJson.select($"user_rating",$"json_data.rating_text", $"json_data.rating_color",$"json_data.votes",$"json_data.aggregate_rating").show()

Result -

+--------------------+-----------+------------+-----+----------------+
|         user_rating|rating_text|rating_color|votes|aggregate_rating|
+--------------------+-----------+------------+-----+----------------+
|{"rating_text": "...|  Excellent|      3F7E00|  778|             4.5|
+--------------------+-----------+------------+-----+----------------+

If the json is on a file then you can simply read it by

    //file contains - {"user_rating": {"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}}
    val df = spark.read.json("path to test.txt")
    df.select("user_rating.rating_text").show()

You can read data from the Row object using index like,

    df.map{ row =>
      (row.getStruct(0).getString(0))
    }.show()

    //Used getStruct(index) because the data type is a complex class. for ordinary values you can use getString, getLong etc

I will highly recommend using schema to read and operate on json. This will save you tons of runtime error and much faster.

Upvotes: 2

Related Questions