schengalath
schengalath

Reputation: 11

UnaryTransformer instance throwing ClassCastException

I have a requirement to create my own UnaryTransformer instance that accepts a Dataframe Column of type Array[String] and should also output the same type.In trying to do so,I encountered a ClassCastException on my Spark version 2.1.0. I've put together a sample test that shows my case.

import org.apache.spark.SparkConf
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

class MyTransformer(override val uid:String) extends UnaryTransformer[Array[String],Array[String],MyTransformer] {
  override protected def createTransformFunc: (Array[String]) => Array[String] = {
    param1 =>  {
        param1.foreach(println(_))
      param1
    }
  }

  override protected def outputDataType: DataType = ArrayType(StringType)

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == ArrayType(StringType), s"Data type mismatch between Array[String] and provided type $inputType.")
  }

  def this() = this( Identifiable.randomUID("tester") )
}


object Tester {



  def main(args: Array[String]): Unit = {

     val config =  new SparkConf().setAppName("Tester")

     implicit val sparkSession = SparkSession.builder().config(config).getOrCreate()
     import sparkSession.implicits._

     val dataframe = Seq(Array("Firstly" , "F1"),Array("Driving" , "S1" ),Array("Ran" , "T3" ),Array("Fourth" ,"F4"), Array("Running" , "F5")
       ,Array("Gone" , "S6")).toDF("input")



    val transformer = new MyTransformer().setInputCol("input").setOutputCol("output")

    val transformed = transformer.transform(dataframe)

    transformed.select("output").show()

    println("Complete....")

    sparkSession.close()


  }

}

Attaching the stack trace for reference

Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (array) => array) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:144) at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48) at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$21.applyOrElse(Optimizer.scala:1078) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$21.applyOrElse(Optimizer.scala:1073) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1073) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1072) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791) at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at org.apache.spark.sql.Dataset.show(Dataset.scala:636) at org.apache.spark.sql.Dataset.show(Dataset.scala:595) at org.apache.spark.sql.Dataset.show(Dataset.scala:604) at Tester$.main(Tester.scala:45) at Tester.main(Tester.scala) Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String; at MyTransformer$$anonfun$createTransformFunc$1.apply(Tester.scala:9) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:89) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:88) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1069) ... 53 more

Upvotes: 0

Views: 413

Answers (1)

user7998996
user7998996

Reputation: 11

ArrayType is represented as Seq not Array:

override protected def createTransformFunc: (Seq[String]) => Seq[String] = {
  param1 =>  {
      param1.foreach(println(_))
    param1
  }
}

Upvotes: 1

Related Questions