jj_coder
jj_coder

Reputation: 53

PySpark - Combine a list of filtering conditions

For starters, let me define a sample dataframe and import the sql functions:

import pyspark.sql.functions as func

row_data = [(1, 1, 1), (1, 1, 2), (1, 1, 3),
           (1, 2, 1), (1, 2, 2), (1, 2, 3),
           (2, 1, 1), (2, 1, 2), (2, 1, 3),
           (2, 2, 1), (2, 2, 2), (2, 2, 3),
           (2, 2, 4), (2, 2, 5), (2, 2, 6)]

test_df = spark.createDataFrame(row_data, ["A", "B", "C"])

test_df.show()

This returns the following dataframe:

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  1|  1|
|  1|  1|  2|
|  1|  1|  3|
|  1|  2|  1|
|  1|  2|  2|
|  1|  2|  3|
|  2|  1|  1|
|  2|  1|  2|
|  2|  1|  3|
|  2|  2|  1|
|  2|  2|  2|
|  2|  2|  3|
|  2|  2|  4|
|  2|  2|  5|
|  2|  2|  6|
+---+---+---+

Now lets say I have a list of filtering conditions, for example, a list of filtering conditions detailing that columns A and B shall be equal to 1

l = [func.col("A") == 1, func.col("B") == 1]

I can combine these two conditions as follows and then filter the dataframe, obtaining the following result:

t = l[0] & l[1]
test_df.filter(t).show()

Result:

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  1|  1|
|  1|  1|  2|
|  1|  1|  3|
+---+---+---+

MY QUESTION

If l is a list of unknown length n (that is, a list of n filtering conditions) instead of only two, which is the most pythonic way, or a one-liner way to logically combine them in and & or | manner?

all() and any() will not work, because they are designed for simple lists of [True, False] elements.

As an example, let us say that l = [func.col("A") == 1, func.col("B") == 1, func.col("C") == 2].

Help would be much appreciated.

Upvotes: 1

Views: 1158

Answers (2)

Chris
Chris

Reputation: 16172

You could use reduce, or a loop. The execution plan in spark will be the same for both, so I believe it's just a matter of preference

for c in l:
  test_df = test_df.where(c)

test_df.explain()

Produces

== Physical Plan ==
*(1) Filter ((isnotnull(A#11487L) AND isnotnull(B#11488L)) AND ((A#11487L = 1) AND (B#11488L = 1)))
+- *(1) Scan ExistingRDD[A#11487L,B#11488L,C#11489L]

and

test_df = test_df.where(reduce(lambda x, y: x & y, l))
test_df.explain()

Produces

== Physical Plan ==
*(1) Filter ((isnotnull(A#11487L) AND isnotnull(B#11488L)) AND ((A#11487L = 1) AND (B#11488L = 1)))
+- *(1) Scan ExistingRDD[A#11487L,B#11488L,C#11489L]

Upvotes: 1

samkart
samkart

Reputation: 6644

you can use reduce() from functools.

reduce(lambda x, y: x & y, l)

reduce() will recursively apply the given function and result in a chained AND operator

Printing the aforementioned will show you the chained operation in the following way

# Column<'((((A = 1) AND (B = 1)) AND (C = 0)) AND (D = 0))'>

Note that I extended your list to include 4 conditions

Upvotes: 4

Related Questions