Naveen Balachandran
Naveen Balachandran

Reputation: 23

Apply logical operation on a dataframe in pyspark

I have a dataframe with many columns and in one of the columns I have the logical operation which I need to perform on the dataframe. As an example look at the dataframe below enter image description here

I need to perform the logical operation defined in the column logical operation on the relevant rows

In a normal scenario i am able to use expr(). But in this case when i want to read it from a column and then apply, it gives me an error saying column is not iterable.

Any suggestions?

Upvotes: 1

Views: 371

Answers (2)

user238607
user238607

Reputation: 2478

Here's a solution using scala UDF in pyspark as they are faster than python UDFs. You can find the code for the UDF and release jar used in the pyspark script in the following repository.

If you want to make modifications to the UDF function to suit your future needs, all you need to do is run sbt assembly to compile the jar.

Then call the com.help.stackoverflow.CheckUDFs class from the jar to verify correct implementation.

https://github.com/dineshdharme/pyspark-native-udfs

Source code for the EvaluateBooleanExpression class :

package com.help.udf

import org.apache.spark.sql.api.java.UDF3
import org.apache.spark.sql.api.java.UDF4

import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox


class EvaluateBooleanExpression extends UDF4[Int, Int, Int, String, Boolean] {

  override def call(a_value:Int, b_value:Int, c_value:Int,  given_expression: String): Boolean = {

    var new_expression = given_expression.replaceAll("A", a_value.toString)
    new_expression = new_expression.replaceAll("B", b_value.toString)
    new_expression = new_expression.replaceAll("C", c_value.toString)
    new_expression = new_expression.replaceAll("0", false.toString)
    new_expression = new_expression.replaceAll("1", true.toString)
    //println("Here's the new expression ", new_expression)

    val toolbox = currentMirror.mkToolBox()
    val calc = toolbox.eval(toolbox.parse(new_expression))

    val convertedCalc = calc.toString.toBoolean
    //println("Here's the new expression ", new_expression)

    convertedCalc
  }
}

Pyspark python script :

import sys

from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", "file:/path/to/pyspark-native-udfs/releases/pyspark-native-udfs-assembly-0.1.3.jar") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)


data1 = [
[0, 1, 1, "(A&B)"],
[1, 1, 1, "(A)"],
[0, 0, 1, "(A|C)"],
      ]

df1Columns = ["A", "B", "C", "exp"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("A", F.col("A").cast("int"))
df1 = df1.withColumn("B", F.col("B").cast("int"))
df1 = df1.withColumn("C", F.col("C").cast("int"))

print("Schema of the dataframe")
df1.printSchema()

print("Given dataframe")
df1.show(n=100, truncate=False)



spark.udf.registerJavaFunction("evaluate_boolean_exp_udf", "com.help.udf.EvaluateBooleanExpression", BooleanType())

df1.createOrReplaceTempView("given_table")

df1_array = sqlContext.sql("select *, evaluate_boolean_exp_udf(A, B, C, exp) as bool_exp_evaluated from given_table")
print("Dataframe after applying SCALA NATIVE UDF")
df1_array.show(n=100, truncate=False)

Output :

Schema of the dataframe
root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)
 |-- exp: string (nullable = true)

Given dataframe
+---+---+---+-----+
|A  |B  |C  |exp  |
+---+---+---+-----+
|0  |1  |1  |(A&B)|
|1  |1  |1  |(A)  |
|0  |0  |1  |(A|C)|
+---+---+---+-----+

Dataframe after applying SCALA NATIVE UDF
+---+---+---+-----+------------------+
|A  |B  |C  |exp  |bool_exp_evaluated|
+---+---+---+-----+------------------+
|0  |1  |1  |(A&B)|false             |
|1  |1  |1  |(A)  |true              |
|0  |0  |1  |(A|C)|true              |
+---+---+---+-----+------------------+

Upvotes: 1

werner
werner

Reputation: 14905

You can use the standard Python eval function inside of an UDF.

The eval function expects the data to be in a dict, so we transform the data columns into a struct first:

from pyspark.sql import functions as F

eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))

df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
    .withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
    .show()

Output:

+---+---+---+-----------------+---------+------+
|  A|  B|  C|logical_operation|     data|result|
+---+---+---+-----------------+---------+------+
|  0|  1|  1|            (A&B)|{0, 1, 1}|     0|
|  1|  1|  1|              (A)|{1, 1, 1}|     1|
|  0|  0|  1|            (A|C)|{0, 0, 1}|     1|
+---+---+---+-----------------+---------+------+

eval comes with some security concerns so please check if this could be a problem for you!

Upvotes: 3

Related Questions