Abhra
Abhra

Reputation: 5

SPARK SQL evaluating string mathematical expressions from a column

I have a dataframe, which contains mathematical expressions in form of a string. Like

Column A Column B Formula
10 20 x+y
10 20 x

where x represents values from Column A and y from Column B. Using Spark SQL, I am trying to evaluate the expression to compute a Column D which would hold the value.

I used the a string replace expression which substitutes the values x and y with the actual values. But they end up being a string numeric expression.

REPLACE(REPLACE(Formula, 'x', Column A), 'y', Column B).

Column A Column B Formula
10 20 10+20
10 20 10

Expected Output

Column A Column B Formula Column D
10 20 10+20 30
10 20 10 10

Is there any way to do this without using UDFs? Any help would be greatly appreciated.

Upvotes: 0

Views: 887

Answers (1)

user238607
user238607

Reputation: 2468

I am not sure how difficult your expressions will get in future. But if they will get complex, then you should probably use python eval within an UDF to do evaluation. You can do the same thing in Native Scala UDF which don't have the penalty of Python UDF. Here's an example code using both. Don't be afraid to write custom scala udfs. They are very easy to write. And run as fast as native java code in JVM so they don't carry the penalty of python udfs which need to spawn a python process.

You can download the jar used in the following script from

https://github.com/dineshdharme/pyspark-native-udfs/tree/main/releases

You can take a look at the scala udf which was compiled to produce the above at this location.

https://github.com/dineshdharme/pyspark-native-udfs/tree/main/src/main/scala/com/help/udf

If you need to change the udf, you can clone the project and compile and run it using sbt assmebly.

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", "file:/path/to/pyspark-native-udfs-assembly-0.1.1.jar") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)


data1 = [
    [3.0 ,9.2, "x+y"],
    [2.223, 1.343, "x+y*x-x/y"],
    [454.0, 12.34223, "x+y+2*y"],
]


def evaluate_exp(x, y , given_exp):
    new_exp = given_exp.replace("x", str(x))
    new_exp = new_exp.replace("y", str(y))
    return eval(new_exp)


evaluate_exp_udf = F.udf(evaluate_exp)

df1Columns = ["x", "y", "expression"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

df1.show(n=100, truncate=False)


df_result = df1.withColumn("result_python", evaluate_exp_udf(F.col("x"), F.col("y"), F.col("expression")))
print("Result using PYTHON UDF")
df_result.show(n=100, truncate=False)


spark.udf.registerJavaFunction("evaluate_exp_udf", "com.help.udf.EvaluateExpression", DoubleType())

df1.createOrReplaceTempView("given_table")

df1_array = sqlContext.sql("select *, evaluate_exp_udf(x, y, expression) as result_scala from given_table")
print("Dataframe after applying SCALA NATIVE UDF")
df1_array.show(n=100, truncate=False)

Scala Code for UDF used above :

package com.help.udf

import org.apache.spark.sql.api.java.UDF3
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
import scala.util.control.Breaks._


class EvaluateExpression extends UDF3[Double, Double, String, Double] {

  override def call(x_value:Double, y_value:Double,  given_expression: String): Double = {

    var new_expression = given_expression.replaceAll("x", x_value.toString)
    new_expression = new_expression.replaceAll("y", y_value.toString)

    //println("Here's the new expression ", new_expression)

    val toolbox = currentMirror.mkToolBox()
    val calc = toolbox.eval(toolbox.parse(new_expression))

    calc.toString.toDouble
  }
}

Output :

+-----+--------+----------+
|x    |y       |expression|
+-----+--------+----------+
|3.0  |9.2     |x+y       |
|2.223|1.343   |x+y*x-x/y |
|454.0|12.34223|x+y+2*y   |
+-----+--------+----------+

Result using PYTHON UDF
+-----+--------+----------+-----------------+
|x    |y       |expression|result_python    |
+-----+--------+----------+-----------------+
|3.0  |9.2     |x+y       |12.2             |
|2.223|1.343   |x+y*x-x/y |3.553239558451229|
|454.0|12.34223|x+y+2*y   |491.02669        |
+-----+--------+----------+-----------------+

Dataframe after applying SCALA NATIVE UDF
+-----+--------+----------+-----------------+
|x    |y       |expression|result_scala     |
+-----+--------+----------+-----------------+
|3.0  |9.2     |x+y       |12.2             |
|2.223|1.343   |x+y*x-x/y |3.553239558451229|
|454.0|12.34223|x+y+2*y   |491.02669        |
+-----+--------+----------+-----------------+

Upvotes: 0

Related Questions