Reputation: 971
I am attempting to add a column containing List[Annotation]
to a Spark DataFrame using the below code (I've reformatted everything so this can be reproduced by directly copying and pasting).
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
case class Annotation(
field1: String,
field2: String,
field3: Int,
field4: Float,
field5: Int,
field6: List[Mapping]
)
case class Mapping(
fieldA: String,
fieldB: String,
fieldC: String,
fieldD: String,
fieldE: String
)
object StructTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val annotationStruct =
StructType(
Array(
StructField("field1", StringType, nullable = true),
StructField("field2", StringType, nullable = true),
StructField("field3", IntegerType, nullable = false),
StructField("field4", FloatType, nullable = false),
StructField("field5", IntegerType, nullable = false),
StructField(
"field6",
ArrayType(
StructType(Array(
StructField("fieldA", StringType, nullable = true),
StructField("fieldB", StringType, nullable = true),
StructField("fieldC", StringType, nullable = true),
StructField("fieldD", StringType, nullable = true),
StructField("fieldE", StringType, nullable = true)
))),
nullable = true
)
)
)
val df = List(1).toDF
val annotation = Annotation("1", "2", 1, .5f, 1, List(Mapping("a", "b", "c", "d", "e")))
val schema = df.schema
val newSchema = schema.add("annotations", ArrayType(annotationStruct), false)
val rdd = df.rdd.map(x => Row.fromSeq(x.toSeq :+ List(annotation)))
val newDF = spark.createDataFrame(rdd, newSchema)
newDF.printSchema
newDF.show
}
}
However, I'm getting an error when running this code.
Caused by: java.lang.RuntimeException: Annotation is not a valid external type for schema of struct<field1:string,field2:string,field3:int,field4:float,field5:int,field6:array<struct<fieldA:string,fieldB:string,fieldC:string,fieldD:string,fieldE:string>>>
The schema I am passing in (ArrayType(annotationStruct)
) appears to be of the incorrect form when creating a dataFrame using createDataFrame
, but it seems to match schemas for DataFrames that contain only List[Annotation]
.
Edit: Example of modifying a DF schema in this fashion with a simple type instead of a case class.
val df = List(1).toDF
spark.createDataFrame(df.rdd.map(x => Row.fromSeq(x.toSeq :+ "moose")), df.schema.add("moose", StringType, false)).show
+-----+-----+
|value|moose|
+-----+-----+
| 1|moose|
+-----+-----+
Edit 2: I've parsed this down a bit more. Sadly, I don't have the option of creating a DataFrame directly from the case class, which is why I am trying to mirror it as a Struct using ScalaReflection. In this case, I am not altering a previous schema, just attempting to create an DataFrame from an RDD of Rows which contain lists of my case class. Spark had an issue in 1.6 which impacts parsing arrays of structs which may be empty or null - I'm wondering if these are linked.
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val annotationSchema = ScalaReflection.schemaFor[Annotation].dataType.asInstanceOf[StructType]
val annotation = Annotation("1", "2", 1, .5, 1, List(Mapping("a", "b", "c", "d", "e")))
val testRDD = spark.sparkContext.parallelize(List(List(annotation))).map(x => Row(x))
val testSchema = StructType(
Array(StructField("annotations", ArrayType(annotationSchema), false)
))
spark.createDataFrame(testRDD, testSchema).show
Upvotes: 1
Views: 1482
Reputation: 41957
If you are concerned with adding a complex column to an existing dataframe, then following solution should work for you.
val df = List(1).toDF
val annotation = sc.parallelize(List(Annotation("1", "2", 1, .5f, 1, List(Mapping("a", "b", "c", "d", "e")))))
val newDF = df.rdd.zip(annotation).map(x => Merged(x._1.get(0).asInstanceOf[Int], x._2)).toDF
newDF.printSchema
newDF.show(false)
which should give you
root
|-- value: integer (nullable = false)
|-- annotations: struct (nullable = true)
| |-- field1: string (nullable = true)
| |-- field2: string (nullable = true)
| |-- field3: integer (nullable = false)
| |-- field4: float (nullable = false)
| |-- field5: integer (nullable = false)
| |-- field6: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- fieldA: string (nullable = true)
| | | |-- fieldB: string (nullable = true)
| | | |-- fieldC: string (nullable = true)
| | | |-- fieldD: string (nullable = true)
| | | |-- fieldE: string (nullable = true)
+-----+---------------------------------------+
|value|annotations |
+-----+---------------------------------------+
|1 |[1,2,1,0.5,1,WrappedArray([a,b,c,d,e])]|
+-----+---------------------------------------+
The case classes used are the same as yours with Merged
case class created.
case class Merged(value : Int, annotations: Annotation)
case class Annotation(field1: String, field2: String, field3: Int, field4: Float, field5: Int, field6: List[Mapping])
case class Mapping(fieldA: String, fieldB: String, fieldC: String, fieldD: String, fieldE: String)
When case classes are used we don't need to define schema. And the process of creation of column names by using case classes and sqlContext.createDataFrame are different.
Upvotes: 1