user3222101
user3222101

Reputation: 1330

How to make dynamic query filter run in pyspark?

I need to prepare a solution to create a parameterized solution to run different filters. For example: I am currently using below query to apply filter on a dataframe but

input_df.filter("not is_deleted and status == 'Active' and brand in ('abc', 'def')")

Need to change this approach to build this query from configuration:

  filters_to_apply = {'table_name' : 'input_df',
    'rule_1' : 'not is_deleted',
    'rule_2' : 'status == "Active"'
    'rule_3' : 'brand in ("abc", "def")'
                              }

filters_to_apply['table_name'].filter(' and '.join([(filters_to_apply[key]) for key in filters_to_apply.keys() if 'rule' in key]))

I am getting error as : AttributeError: 'str' object has no attribute 'filter'

Please could you advice

Upvotes: 2

Views: 3127

Answers (2)

Prateek Jain
Prateek Jain

Reputation: 597

Firstly, configuration should look as below, that is input_df should not be a string

  filters_to_apply = {'table_name' : input_df,
    'rule_1' : 'not is_deleted',
    'rule_2' : 'status == "Active"'
    'rule_3' : 'brand in ("abc", "def")'}

But, if you have registered input_df as temp table then definitely you can pass it as a string. I mean as below:

input_df.registerTempTable("input_df")

Assuming you have registered it as temp table, one of the way to do that could be as follows:

def prepare_data(config):
   df = spark.table(config['table_name'])
   for key in config.keys(): 
      if key.starts_with("rule_"):
         df = df.filter(config[key])
   return df

Also, if you want to lean more about how to automate your codes, refer to this link - https://youtu.be/KBRPUDovzmc

Upvotes: 0

boechat107
boechat107

Reputation: 1724

First of all, it is important to understand why you are getting this error.

The way the dictionary filters_to_apply is defined, it is mapping string keys to string values. Having a defined variable named input_df has nothing to do with a string with the characters "input_df". For Python, they are two completely different things.

As far as I know, you have two alternatives here:

  1. You can pass the variable input_df (assuming it is defined somewhere) around and apply the filters directly on it (you can have a function in which one of its arguments is a DataFrame, so it can handle different DataFrames).
  2. With the defined variable input_df, you can register a temporary view and retrieve the same DataFrame later.

My suggestion

Define a function whose arguments are composed of a DataFrame and a sequence of rules.

Example of a function definition:

from pyspark.sql import DataFrame
from typing import Iterable

def my_filter(df: DataFrame, conditions: Iterable[str]) -> DataFrame:
    return df.filter(" and ".join(conditions))

Example of usage:

df = sparksession.createDataFrame(
    [(1, True, "Active"), (2, False, "Active"), (3, True, "Disabled")],
    ["id", "bool", "status"]
)
df.show()

my_filter(df, ["not bool", "status = 'Active'"]).show()

Results:

+---+-----+--------+
| id| bool|  status|
+---+-----+--------+
|  1| true|  Active|
|  2|false|  Active|
|  3| true|Disabled|
+---+-----+--------+

+---+-----+------+
| id| bool|status|
+---+-----+------+
|  2|false|Active|
+---+-----+------+

Upvotes: 3

Related Questions