Reputation: 556
I am trying to add a new column in each row of DataFrame like this
def addNamespace(iter: Iterator[Row]): Iterator[Row] = {
iter.map (row => {
println(row.getString(0))
// Row.fromSeq(row.toSeq ++ Array[String]("shared"))
val newseq = row.toSeq ++ Array[String]("shared")
Row(newseq: _*)
})
iter
}
def transformDf(source: DataFrame)(implicit spark: SparkSession): DataFrame = {
val newSchema = StructType(source.schema.fields ++ Array(StructField("namespace", StringType, nullable = true)))
val df = spark.sqlContext.createDataFrame(source.rdd.mapPartitions(addNamespace), newSchema)
df.show()
df
}
But I keep getting this error - Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
on the line df.show()
Can somebody please help in figuring out this. I have searched around in multiple posts but whatever I have tried is giving me this error.
I have also tried val again = sourceDF.withColumn("namespace", functions.lit("shared"))
but it has the same issue.
Schema of already read data
root
|-- name: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
| |-- activates_on: timestamp (nullable = true)
| |-- expires_on: timestamp (nullable = true)
| |-- created_by: string (nullable = true)
| |-- created_on: timestamp (nullable = true)
| |-- updated_by: string (nullable = true)
| |-- updated_on: timestamp (nullable = true)
| |-- properties: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
Upvotes: 2
Views: 1649
Reputation: 29155
Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
means its unable to understand as string type... for newly added "namespace" column.
Clearly indicates datatype mismatch error at catalyst level...
see spark code here..
override def eval(input: InternalRow): Any = {
val result = child.eval(input)
if (checkType(result)) {
result
} else {
throw new RuntimeException(s"${result.getClass.getName}$errMsg")
}
}
and error message is s" is not a valid external type for schema of ${expected.catalogString}"
So UTF String is not real string you need to encode/decode it before passing it as string type otherwise catalyst will not able to understand what you are passing.
How to fix it ?
Below are the SO content which will address how to encode/decode to/from utfstring to string and viceversa... you may need to apply suitable solution for this.
https://stackoverflow.com/a/5943395/647053 string decode utf-8
Note : This online UTF-8 encoder/decoder tool is very handy to put sample data and convert that to string. try this first....
Upvotes: 0