Reputation: 63022
For the following code - in which a DataFrame is converted to RDD[Row]
and data for a new column is appended via mapPartitions
:
// df is a DataFrame
val dfRdd = df.rdd.mapPartitions {
val bfMap = df.rdd.sparkContext.broadcast(factorsMap)
iter =>
val locMap = bfMap.value
iter.map { r =>
val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
Row(newseq)
}
}
The output is correct for the RDD[Row]
with another column:
println("**dfrdd\n" + dfRdd.take(5).mkString("\n"))
**dfrdd
[ArrayBuffer(0021BEC286CC, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 148818)]
[ArrayBuffer(0021BEE7C556, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 26908)]
[ArrayBuffer(8C7F3BFD4B82, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 99942)]
[ArrayBuffer(0021BEC8F8B8, 1, Series, series, 0d2debc63efa3790a444c7959249712b, livetv, 53994)]
[ArrayBuffer(10EA59F10C8B, 1, Series, series, 0d2debc63efa3790a444c7959249712b, livetv, 1427)]
Let us try to convert the RDD[Row]
back to a DataFrame:
val newSchema = df.schema.add(StructField("userf",IntegerType))
Now let us create the updated DataFrame:
val df2 = df.sqlContext.createDataFrame(dfRdd,newSchema)
Is the new schema looking correct?
newSchema.show()
root
|-- user: string (nullable = true)
|-- score: long (nullable = true)
|-- programType: string (nullable = true)
|-- source: string (nullable = true)
|-- item: string (nullable = true)
|-- playType: string (nullable = true)
|-- userf: integer (nullable = true)
Notice we do see the new userf
column..
However it does not work:
println("df2: " + df2.take(1))
Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost, executor driver): java.lang.RuntimeException: Error while encoding:
java.lang.RuntimeException: scala.collection.mutable.ArrayBuffer is not a
valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, user), StringType), true) AS user#28
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, user), StringType), true)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 0
:- null
So: what detail is missing here?
Note: I am not interested in different approaches: e.g. withColumn
or Datasets
.. Let us please consider only the approach:
Upvotes: 2
Views: 2369
Reputation: 37822
There seems to be a small mistake calling Row
's constructor:
val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
Row(newseq)
The signature of this "constructor" (apply method, actually) is:
def apply(values: Any*): Row
When you pass a Seq[Any]
, it is treated as a single value of type Seq[Any]
. You want to pass the elements of this Sequence, therefore you should use:
val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
Row(newseq: _*)
Once this is fixed, the Rows will match the schema you built, and you'll get the expected result.
Upvotes: 4