James Alexander
James Alexander

Reputation: 6312

How can I convert BinaryType to Array[Byte] when calling Scala UDF in Spark?

I've written the following UDF in Scala:

import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}

def Decompress(compressed: Array[Byte]): String = {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
  val output = scala.io.Source.fromInputStream(inputStream).mkString
  
  return output
}

val decompressUdf = (compressed: Array[Byte]) => {
  Decompress(compressed)
}

spark.udf.register("Decompress", decompressUdf)

I'm then attempting to call the UDF with the following:

val sessionsRawDF =
  sessionRawDF
    .withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
    .select(
      current_timestamp().alias("ingesttime"),
      current_timestamp().cast("date").alias("p_ingestdate"),
      col("partition"),
      col("enqueuedTime"),
      col("WebsiteSession").alias("Json")
    )

When I run this, I get the following error:

command-130062350733681:9: error: type mismatch;
found: org.apache.spark.sql.Column required: Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")

I was under the impression Spark would implicitly get the value and go from the spark type to Array[Byte] in this case.

Would some please help me understand what's going on, I've been fighting this for a while and not sure what else to try.

Upvotes: 0

Views: 1142

Answers (1)

mck
mck

Reputation: 42392

You need to convert the Scala function to a Spark UDF first, before you can register it as a UDF. For example,

val decompressUdf = udf(Decompress _)

spark.udf.register("Decompress", decompressUdf)

In fact, there is no need to register the UDF if you're just using it in the DataFrame API. You can simply run the first line and use decompressUdf. Registering is only needed if you want to use the UDF in SQL.

Upvotes: 3

Related Questions