Reputation: 5
I have a dataframe, which contains mathematical expressions in form of a string. Like
Column A | Column B | Formula |
---|---|---|
10 | 20 | x+y |
10 | 20 | x |
where x represents values from Column A and y from Column B. Using Spark SQL, I am trying to evaluate the expression to compute a Column D which would hold the value.
I used the a string replace expression which substitutes the values x and y with the actual values. But they end up being a string numeric expression.
REPLACE(REPLACE(Formula, 'x', Column A), 'y', Column B).
Column A | Column B | Formula |
---|---|---|
10 | 20 | 10+20 |
10 | 20 | 10 |
Expected Output
Column A | Column B | Formula | Column D |
---|---|---|---|
10 | 20 | 10+20 | 30 |
10 | 20 | 10 | 10 |
Is there any way to do this without using UDFs? Any help would be greatly appreciated.
Upvotes: 0
Views: 887
Reputation: 2468
I am not sure how difficult your expressions will get in future. But if they will get complex, then you should probably use python eval
within an UDF to do evaluation. You can do the same thing in Native Scala UDF which don't have the penalty of Python UDF. Here's an example code using both. Don't be afraid to write custom scala udfs. They are very easy to write. And run as fast as native java code in JVM so they don't carry the penalty of python udfs which need to spawn a python process.
You can download the jar used in the following script from
https://github.com/dineshdharme/pyspark-native-udfs/tree/main/releases
You can take a look at the scala udf which was compiled to produce the above at this location.
https://github.com/dineshdharme/pyspark-native-udfs/tree/main/src/main/scala/com/help/udf
If you need to change the udf, you can clone the project and compile and run it using sbt assmebly
.
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.jars", "file:/path/to/pyspark-native-udfs-assembly-0.1.1.jar") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
data1 = [
[3.0 ,9.2, "x+y"],
[2.223, 1.343, "x+y*x-x/y"],
[454.0, 12.34223, "x+y+2*y"],
]
def evaluate_exp(x, y , given_exp):
new_exp = given_exp.replace("x", str(x))
new_exp = new_exp.replace("y", str(y))
return eval(new_exp)
evaluate_exp_udf = F.udf(evaluate_exp)
df1Columns = ["x", "y", "expression"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)
df1.show(n=100, truncate=False)
df_result = df1.withColumn("result_python", evaluate_exp_udf(F.col("x"), F.col("y"), F.col("expression")))
print("Result using PYTHON UDF")
df_result.show(n=100, truncate=False)
spark.udf.registerJavaFunction("evaluate_exp_udf", "com.help.udf.EvaluateExpression", DoubleType())
df1.createOrReplaceTempView("given_table")
df1_array = sqlContext.sql("select *, evaluate_exp_udf(x, y, expression) as result_scala from given_table")
print("Dataframe after applying SCALA NATIVE UDF")
df1_array.show(n=100, truncate=False)
Scala Code for UDF used above :
package com.help.udf
import org.apache.spark.sql.api.java.UDF3
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
import scala.util.control.Breaks._
class EvaluateExpression extends UDF3[Double, Double, String, Double] {
override def call(x_value:Double, y_value:Double, given_expression: String): Double = {
var new_expression = given_expression.replaceAll("x", x_value.toString)
new_expression = new_expression.replaceAll("y", y_value.toString)
//println("Here's the new expression ", new_expression)
val toolbox = currentMirror.mkToolBox()
val calc = toolbox.eval(toolbox.parse(new_expression))
calc.toString.toDouble
}
}
Output :
+-----+--------+----------+
|x |y |expression|
+-----+--------+----------+
|3.0 |9.2 |x+y |
|2.223|1.343 |x+y*x-x/y |
|454.0|12.34223|x+y+2*y |
+-----+--------+----------+
Result using PYTHON UDF
+-----+--------+----------+-----------------+
|x |y |expression|result_python |
+-----+--------+----------+-----------------+
|3.0 |9.2 |x+y |12.2 |
|2.223|1.343 |x+y*x-x/y |3.553239558451229|
|454.0|12.34223|x+y+2*y |491.02669 |
+-----+--------+----------+-----------------+
Dataframe after applying SCALA NATIVE UDF
+-----+--------+----------+-----------------+
|x |y |expression|result_scala |
+-----+--------+----------+-----------------+
|3.0 |9.2 |x+y |12.2 |
|2.223|1.343 |x+y*x-x/y |3.553239558451229|
|454.0|12.34223|x+y+2*y |491.02669 |
+-----+--------+----------+-----------------+
Upvotes: 0