Roman
Roman

Reputation: 260

Spark UnaryTransformer implementation fails with scala.MatchError

I'm implementing an UnaryTransformer in Spark 1.6.2. with this interface:

class myUT(override val uid: String) extends UnaryTransformer[Seq[String], Seq[String], myUT] {
...
override protected def createTransformFunc: Seq[String] => Seq[String] = {
   _ => _.map(x => x + "s")
}

This compiles well but at runtime returns me an error:

17/07/21 22:29:33 WARN TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3, myhost.com.au): scala.MatchError: ArrayBuffer(<contents of my array>) (of class scala.collection.mutable.ArrayBuffer)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)

The next thing I tried was replace

_ => _.map(x => x + "s")

with

_ => _

So, theoretically it should mean no change of data at all! But the error I've got was:

17/07/21 22:11:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, myhost.com.au): scala.MatchError: WrappedArray(<contains of my array>) (of class scala.collection.mutable.WrappedArray$ofRef)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)

So it looks like the type of outgoing data gets changes anyway. How do I avoid this?

Update: The next thing I tried adding .toArray to the map. The error is now this:

[error] /sparkprj/src/main/scala/sp_txt.scala:43: polymorphic expression cannot be instantiated to expected type;
[error]  found   : [B >: String]Array[B]
[error]  required: Seq[String]
[error]                                           ).toArray

It may add some detail, but doesn't add much to my understanding. After reviewing a few examples of mllib UnaryTransformer I tend to believe it's a bug in Catalyst.

Upvotes: 1

Views: 446

Answers (1)

Roman
Roman

Reputation: 260

This line in definition of class myUT was incorrect:

override protected def outputDataType: DataType = new ArrayType(StringType, true)

As I copied this class definition from a String->String transformer, I had DataType defined as just StringType. My bad.

Upvotes: 1

Related Questions