Reputation: 23
I have a dataframe with many columns and in one of the columns I have the logical operation which I need to perform on the dataframe. As an example look at the dataframe below
I need to perform the logical operation defined in the column logical operation on the relevant rows
In a normal scenario i am able to use expr(). But in this case when i want to read it from a column and then apply, it gives me an error saying column is not iterable.
Any suggestions?
Upvotes: 1
Views: 371
Reputation: 2478
Here's a solution using scala UDF in pyspark as they are faster than python UDFs. You can find the code for the UDF and release jar used in the pyspark script in the following repository.
If you want to make modifications to the UDF function to suit your future needs, all you need to do is run sbt assembly
to compile the jar.
Then call the com.help.stackoverflow.CheckUDFs
class from the jar to verify correct implementation.
https://github.com/dineshdharme/pyspark-native-udfs
Source code for the EvaluateBooleanExpression
class :
package com.help.udf
import org.apache.spark.sql.api.java.UDF3
import org.apache.spark.sql.api.java.UDF4
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
class EvaluateBooleanExpression extends UDF4[Int, Int, Int, String, Boolean] {
override def call(a_value:Int, b_value:Int, c_value:Int, given_expression: String): Boolean = {
var new_expression = given_expression.replaceAll("A", a_value.toString)
new_expression = new_expression.replaceAll("B", b_value.toString)
new_expression = new_expression.replaceAll("C", c_value.toString)
new_expression = new_expression.replaceAll("0", false.toString)
new_expression = new_expression.replaceAll("1", true.toString)
//println("Here's the new expression ", new_expression)
val toolbox = currentMirror.mkToolBox()
val calc = toolbox.eval(toolbox.parse(new_expression))
val convertedCalc = calc.toString.toBoolean
//println("Here's the new expression ", new_expression)
convertedCalc
}
}
Pyspark python script :
import sys
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.jars", "file:/path/to/pyspark-native-udfs/releases/pyspark-native-udfs-assembly-0.1.3.jar") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
data1 = [
[0, 1, 1, "(A&B)"],
[1, 1, 1, "(A)"],
[0, 0, 1, "(A|C)"],
]
df1Columns = ["A", "B", "C", "exp"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("A", F.col("A").cast("int"))
df1 = df1.withColumn("B", F.col("B").cast("int"))
df1 = df1.withColumn("C", F.col("C").cast("int"))
print("Schema of the dataframe")
df1.printSchema()
print("Given dataframe")
df1.show(n=100, truncate=False)
spark.udf.registerJavaFunction("evaluate_boolean_exp_udf", "com.help.udf.EvaluateBooleanExpression", BooleanType())
df1.createOrReplaceTempView("given_table")
df1_array = sqlContext.sql("select *, evaluate_boolean_exp_udf(A, B, C, exp) as bool_exp_evaluated from given_table")
print("Dataframe after applying SCALA NATIVE UDF")
df1_array.show(n=100, truncate=False)
Output :
Schema of the dataframe
root
|-- A: integer (nullable = true)
|-- B: integer (nullable = true)
|-- C: integer (nullable = true)
|-- exp: string (nullable = true)
Given dataframe
+---+---+---+-----+
|A |B |C |exp |
+---+---+---+-----+
|0 |1 |1 |(A&B)|
|1 |1 |1 |(A) |
|0 |0 |1 |(A|C)|
+---+---+---+-----+
Dataframe after applying SCALA NATIVE UDF
+---+---+---+-----+------------------+
|A |B |C |exp |bool_exp_evaluated|
+---+---+---+-----+------------------+
|0 |1 |1 |(A&B)|false |
|1 |1 |1 |(A) |true |
|0 |0 |1 |(A|C)|true |
+---+---+---+-----+------------------+
Upvotes: 1
Reputation: 14905
You can use the standard Python eval function inside of an UDF.
The eval
function expects the data to be in a dict, so we transform the data columns into a struct first:
from pyspark.sql import functions as F
eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))
df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
.withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
.show()
Output:
+---+---+---+-----------------+---------+------+
| A| B| C|logical_operation| data|result|
+---+---+---+-----------------+---------+------+
| 0| 1| 1| (A&B)|{0, 1, 1}| 0|
| 1| 1| 1| (A)|{1, 1, 1}| 1|
| 0| 0| 1| (A|C)|{0, 0, 1}| 1|
+---+---+---+-----------------+---------+------+
eval
comes with some security concerns so please check if this could be a problem for you!
Upvotes: 3