Vitrion
Vitrion

Reputation: 415

Why does Spark infer a binary instead of an Array[Byte] when creating a DataFrame?

In principle, I have a DataFrame that consists of the "Name" and "Values" fields. The first field is a String, while the second is an Array[Byte].

What I want to do with each record of this DataFrame is to apply any function, using a UDF and create a new column. This works perfectly when "Values" is an Array[Int]. However, when it is an Array[Byte], the following error appears:

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(Values)' due to data type mismatch: argument 1 requires array<tinyint> type, however, '`Values`' is of binary type.;;
'Project [Name#47, Values#48, UDF(Values#48) AS TwoTimes#56]
+- Project [_1#44 AS Name#47, _2#45 AS Values#48]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#44, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#45]
  +- ExternalRDD [obj#43]

The full code is the following:

scala> val df1 = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df1: org.apache.spark.sql.DataFrame = [Name: string, Values: binary]

scala> df1.show
+----+----------------+
|Name|          Values|
+----+----------------+
| one|[01 02 03 04 05]|
| two|[06 07 08 09 0A]|
+----+----------------+

scala> val twice = udf { (values: Seq[Byte]) =>
   |     val result = Array.ofDim[Byte](values.length)
   |     for (i <- values.indices)
   |         result(i) = (2 * values(i).toInt).toByte
   |     result
   | }
twice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BinaryType,Some(List(ArrayType(ByteType,false))))

scala> val df2 = df1.withColumn("TwoTimes", twice('Values))

I understand that an error like this arises due to the wrong data type (expected: Array[Byte], however it finds a Binary), but what I do not understand is why Spark has inferred my Array[Byte] as a Binary. Can someone explain to me, please?

If I had to use Binary instead of Array[Byte], how should I handle it within my UDF?

I clarify that my original UDF does not use a trivial for loop. I understand that in this example, this could be replaced by the map method.

Upvotes: 4

Views: 8102

Answers (1)

Shaido
Shaido

Reputation: 28367

In Spark, Array[Byte] is represented as a BinaryType. From the documentation we can see:

public class BinaryType extends DataType
The data type representing Array[Byte] values. Please use the singleton DataTypes.BinaryType.

Hence, Array[Byte] and Binary are the same, however, there are some differences between these and Seq[Byte] which results in an error.

To fix the issue, simply replace Seq[Byte] with Array[Byte] in the udf:

val twice = udf { (values: Array[Byte]) =>
  val result = Array.ofDim[Byte](values.length)
  for (i <- values.indices)
    result(i) = (2 * values(i).toInt).toByte
  result
}

Upvotes: 6

Related Questions