Reputation: 677
I have a json string as :
{"user_rating": {"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}}
I wish to create a DataFrame out of it with DataFrame columns as:
rating_text | rating_color | votes | aggregate_rating
When I code it as :
val pdf = json.select("user_rating")
I get only one column user_rating
I approached this most voted solution but still getting only user_rating
column : pdf.show()
Not sure how the Solution1 works exactly.
Unable to access columns by index as provided in Solution2. Getting NoSuchColumn
column error.
What is the best possible approach to extract keys(rating_text,rating_color,..)
and use as columns in a DataFrame?
Language I am using : Scala
Tried the below way to Iterate over each Row
in DataFrame and parse by getting columns:
val pdf = json.select("restaurants.restaurant.user_rating")
pdf.map{Rrowow => (row.getStruct(0).getString(0))}.show()
Getting below exception at map
function:
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row
Upvotes: 3
Views: 8018
Reputation: 29237
Any level of nested data can be un nested and create flat data frame using this kind of approach.
Logic is as below
import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types.{ArrayType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/** *
* expand_nested_column :
* @param json_data_df_temp
* @return [[DataFrame]]
*/
def expand_nested_column(json_data_df_temp: DataFrame): DataFrame = {
var json_data_df: DataFrame = json_data_df_temp
var select_clause_list = List.empty[String]
// Iterating each columns again to check if any next json data is exists
for (column_name <- json_data_df.schema.names) {
println("Outside isinstance loop: " + column_name)
// Checking column type is ArrayType
if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]) {
println("Inside isinstance loop: " + column_name)
//Extracting nested json columns/data using explode function
json_data_df = json_data_df.withColumn(column_name, explode(json_data_df(column_name)).alias(column_name))
select_clause_list :+= column_name
}
else if (json_data_df.schema(column_name).dataType.isInstanceOf[StructType]) {
println("Inside isinstance loop of StructType: " + column_name)
for (field <- json_data_df.schema(column_name).dataType.asInstanceOf[StructType].fields) {
select_clause_list :+= column_name + "." + field.name
}
}
else {
select_clause_list :+= column_name
}
}
val columnNames = select_clause_list.map(name => col(name).alias(name.replace('.', '_')))
// Selecting columns using select_clause_list from dataframe: json_data_df
json_data_df.select(columnNames: _*)
}
var json_data_df = spark.read.json(
"{\"user_rating\": {\"rating_text\": \"Excellent\", \"rating_color\": \"3F7E00\", \"votes\": \"778\", \"aggregate_rating\": \"4.5\"}}".lines.toList.toDS()
)
json_data_df.show(10, false)
json_data_df.printSchema()
// Process the Nested Structure
var nested_column_count = 1
// Run the while loop until the nested_column_count is zero(0)
while (nested_column_count != 0) {
println("Printing nested_column_count: " + nested_column_count)
var nested_column_count_temp = 0
// Iterating each columns again to check if any next json data is exists
for (column_name <- json_data_df.schema.names) {
print("Iterating DataFrame Columns: " + column_name)
// Checking column type is ArrayType
if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]
|| json_data_df.schema(column_name).dataType.isInstanceOf[StructType]) {
nested_column_count_temp += 1
}
}
if (nested_column_count_temp != 0) {
json_data_df = expand_nested_column(json_data_df)
json_data_df.show(100, false)
}
print("Printing nested_column_count_temp: " + nested_column_count_temp)
nested_column_count = nested_column_count_temp
}
json_data_df.show(100, false)
json_data_df.printSchema()
Result :
+-----------------------------+
|user_rating |
+-----------------------------+
|[4.5, 3F7E00, Excellent, 778]|
+-----------------------------+
root
|-- user_rating: struct (nullable = true)
| |-- aggregate_rating: string (nullable = true)
| |-- rating_color: string (nullable = true)
| |-- rating_text: string (nullable = true)
| |-- votes: string (nullable = true)
Printing nested_column_count: 1
Iterating DataFrame Columns: user_ratingOutside isinstance loop: user_rating
Inside isinstance loop of StructType: user_rating
+----------------------------+------------------------+-----------------------+-----------------+
|user_rating_aggregate_rating|user_rating_rating_color|user_rating_rating_text|user_rating_votes|
+----------------------------+------------------------+-----------------------+-----------------+
|4.5 |3F7E00 |Excellent |778 |
+----------------------------+------------------------+-----------------------+-----------------+
Upvotes: 0
Reputation: 677
I got the work around. Basically what I was looking for was explode
function. Which returns a Row for each element in the column.
Upvotes: 1
Reputation: 2178
You can parse a column containing JSON String and build a dataframe containing all columns within the JSON. Here is an example -
val jsonData = """{"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}"""
val schema = {StructType(
List(
StructField("rating_text", StringType, nullable = false),
StructField("rating_color", StringType, nullable = false),
StructField("votes", StringType, nullable = false),
StructField("aggregate_rating", StringType, nullable = false)
))}
val df = spark.createDataset(Seq(jsonData)).toDF("user_rating")
val dfWithParsedJson = df.withColumn("json_data",from_json($"user_rating",schema))
dfWithParsedJson.select($"user_rating",$"json_data.rating_text", $"json_data.rating_color",$"json_data.votes",$"json_data.aggregate_rating").show()
Result -
+--------------------+-----------+------------+-----+----------------+
| user_rating|rating_text|rating_color|votes|aggregate_rating|
+--------------------+-----------+------------+-----+----------------+
|{"rating_text": "...| Excellent| 3F7E00| 778| 4.5|
+--------------------+-----------+------------+-----+----------------+
If the json is on a file then you can simply read it by
//file contains - {"user_rating": {"rating_text": "Excellent", "rating_color": "3F7E00", "votes": "778", "aggregate_rating": "4.5"}}
val df = spark.read.json("path to test.txt")
df.select("user_rating.rating_text").show()
You can read data from the Row
object using index like,
df.map{ row =>
(row.getStruct(0).getString(0))
}.show()
//Used getStruct(index) because the data type is a complex class. for ordinary values you can use getString, getLong etc
I will highly recommend using schema to read and operate on json. This will save you tons of runtime error and much faster.
Upvotes: 2