Powers
Powers

Reputation: 19308

SparkSessionExtensions injectFunction in Databricks environment

SparkSessionExtensions injectFunction works locally, but I can't get it working in the Databricks environment.

The itachi project defines Catalyst expressions, like age that I can successfully use locally via spark-sql:

bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
spark-sql> select age(timestamp '2000', timestamp'1990');
10 years

I'm having trouble getting this working in the Databricks environment.

I started up a Databricks community cluster with the spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions configuration option set.

create cluster

Then I attached the library.

attach lib

The array_append function that's defined in itachi isn't accessible like I expected it to be:

fail to run function

Confirm configuration option is properly set:

enter image description here

spark-alchemy has another approach that works in the Databricks environment. Do we need to mess around with Spark internals to get this working in the Databricks environment? Or is there a way to get injectFunction working in Databricks?

Upvotes: 2

Views: 1069

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

The spark.sql.extensions works just fine on full Databricks (until it's going too deep into the internals of the Spark - sometimes there are incompatibilities), but not on Community Edition. The problem is that spark.sql.extensions are called during session initialization, and library specified in UI is installed afterwards, so this happens after/in parallel with initialization. On full Databricks that's workarounded by using init script to install library before cluster starts, but this functionality is not available on Community Edition.

The workaround would be to register functions explicitly, like this:

%scala
import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)

After that it works:

enter image description here

It's not so handy as extensions, but that's a limitation of CE.

Upvotes: 1

Related Questions