Reputation: 1085
Using Spark 2.11, I've the following Dataset (read from Cassandra table):
+------------+----------------------------------------------------------+
|id |attributes |
+------------+----------------------------------------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}] |
+------------+----------------------------------------------------------+
This is the printSchema():
root
|-- id: string (nullable = true)
|-- attributes: string (nullable = true)
The attributes
column is an array of JSON objects. I'm trying to explode it into Dataset but keep failing. I was trying to define schema as follow:
StructType type = new StructType()
.add("id", new IntegerType(), false)
.add("name", new StringType(), false)
.add("score", new FloatType(), false)
.add("snippets", new IntegerType(), false );
ArrayType schema = new ArrayType(type, false);
And provide it to from_json
as follow:
df = df.withColumn("val", functions.from_json(df.col("attributes"), schema));
This fails with MatchError:
Exception in thread "main" scala.MatchError: org.apache.spark.sql.types.IntegerType@43756cb (of class org.apache.spark.sql.types.IntegerType)
What's the correct way to do that?
Upvotes: 0
Views: 1382
Reputation: 32640
You can specify the schema this way :
val schema = ArrayType(
StructType(Array(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("score", FloatType, false),
StructField("snippets", IntegerType, false)
)),
false
)
val df1 = df.withColumn("val", from_json(col("attributes"), schema))
df1.show(false)
//+-----------+------------------------------------------------------+------------------------+
//|id |attributes |val |
//+-----------+------------------------------------------------------+------------------------+
//|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
//+-----------+------------------------------------------------------+------------------------+
Or for Java:
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createArrayType(createStructType(Arrays.asList(
createStructField("id", IntegerType, false),
createStructField("name", StringType, false),
createStructField("score", FloatType, false),
createStructField("snippets", StringType, false)
)), false);
Upvotes: 2
Reputation: 42332
You can define the schema as a literal string instead:
val df2 = df.withColumn(
"val",
from_json(
df.col("attributes"),
lit("array<struct<id: int, name: string, score: float, snippets: int>>")
)
)
df2.show(false)
+-----------+------------------------------------------------------+------------------------+
|id |attributes |val |
+-----------+------------------------------------------------------+------------------------+
|YH8B135U123|[{"id":1,"name":"function","score":10.0,"snippets":1}]|[[1, function, 10.0, 1]]|
+-----------+------------------------------------------------------+------------------------+
If you prefer to use a schema:
val spark_struct = new StructType()
.add("id", IntegerType, false)
.add("name", StringType, false)
.add("score", FloatType, false)
.add("snippets", IntegerType, false)
val schema = new ArrayType(spark_struct, false)
val df2 = df.withColumn(
"val",
from_json(
df.col("attributes"),
schema
)
)
Two problems with your original code were: (1) you used the reserved keyword type
as a variable name, and (2) you don't need to use new
in add
.
Upvotes: 1