Reputation: 6312
I've written the following UDF in Scala:
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}
def Decompress(compressed: Array[Byte]): String = {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
val output = scala.io.Source.fromInputStream(inputStream).mkString
return output
}
val decompressUdf = (compressed: Array[Byte]) => {
Decompress(compressed)
}
spark.udf.register("Decompress", decompressUdf)
I'm then attempting to call the UDF with the following:
val sessionsRawDF =
sessionRawDF
.withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
.select(
current_timestamp().alias("ingesttime"),
current_timestamp().cast("date").alias("p_ingestdate"),
col("partition"),
col("enqueuedTime"),
col("WebsiteSession").alias("Json")
)
When I run this, I get the following error:
command-130062350733681:9: error: type mismatch;
found: org.apache.spark.sql.Column required: Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")
I was under the impression Spark would implicitly get the value and go from the spark type to Array[Byte] in this case.
Would some please help me understand what's going on, I've been fighting this for a while and not sure what else to try.
Upvotes: 0
Views: 1142
Reputation: 42392
You need to convert the Scala function to a Spark UDF first, before you can register it as a UDF. For example,
val decompressUdf = udf(Decompress _)
spark.udf.register("Decompress", decompressUdf)
In fact, there is no need to register the UDF if you're just using it in the DataFrame API. You can simply run the first line and use decompressUdf
. Registering is only needed if you want to use the UDF in SQL.
Upvotes: 3