Reputation: 1330
I need to prepare a solution to create a parameterized solution to run different filters. For example: I am currently using below query to apply filter on a dataframe but
input_df.filter("not is_deleted and status == 'Active' and brand in ('abc', 'def')")
Need to change this approach to build this query from configuration:
filters_to_apply = {'table_name' : 'input_df',
'rule_1' : 'not is_deleted',
'rule_2' : 'status == "Active"'
'rule_3' : 'brand in ("abc", "def")'
}
filters_to_apply['table_name'].filter(' and '.join([(filters_to_apply[key]) for key in filters_to_apply.keys() if 'rule' in key]))
I am getting error as : AttributeError: 'str' object has no attribute 'filter'
Please could you advice
Upvotes: 2
Views: 3127
Reputation: 597
Firstly, configuration should look as below, that is input_df should not be a string
filters_to_apply = {'table_name' : input_df,
'rule_1' : 'not is_deleted',
'rule_2' : 'status == "Active"'
'rule_3' : 'brand in ("abc", "def")'}
But, if you have registered input_df as temp table then definitely you can pass it as a string. I mean as below:
input_df.registerTempTable("input_df")
Assuming you have registered it as temp table, one of the way to do that could be as follows:
def prepare_data(config):
df = spark.table(config['table_name'])
for key in config.keys():
if key.starts_with("rule_"):
df = df.filter(config[key])
return df
Also, if you want to lean more about how to automate your codes, refer to this link - https://youtu.be/KBRPUDovzmc
Upvotes: 0
Reputation: 1724
First of all, it is important to understand why you are getting this error.
The way the dictionary filters_to_apply
is defined, it is mapping string keys to string values. Having a defined variable named input_df
has nothing to do with a string with the characters "input_df"
. For Python, they are two completely different things.
As far as I know, you have two alternatives here:
input_df
(assuming it is defined somewhere) around and apply the filters directly on it (you can have a function in which one of its arguments is a DataFrame, so it can handle different DataFrames).input_df
, you can register a temporary view and retrieve the same DataFrame later.Define a function whose arguments are composed of a DataFrame and a sequence of rules.
Example of a function definition:
from pyspark.sql import DataFrame
from typing import Iterable
def my_filter(df: DataFrame, conditions: Iterable[str]) -> DataFrame:
return df.filter(" and ".join(conditions))
Example of usage:
df = sparksession.createDataFrame(
[(1, True, "Active"), (2, False, "Active"), (3, True, "Disabled")],
["id", "bool", "status"]
)
df.show()
my_filter(df, ["not bool", "status = 'Active'"]).show()
Results:
+---+-----+--------+
| id| bool| status|
+---+-----+--------+
| 1| true| Active|
| 2|false| Active|
| 3| true|Disabled|
+---+-----+--------+
+---+-----+------+
| id| bool|status|
+---+-----+------+
| 2|false|Active|
+---+-----+------+
Upvotes: 3