dtj
dtj

Reputation: 281

How to check if array column is inside another column array in PySpark dataframe

Suppose I have the following case

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("ev", ArrayType(StringType()), True),
    StructField("ev2", ArrayType(StringType()), True),])
df = spark.createDataFrame([{"id": "se1", "ev": ["ev11", "ev12"], "ev2": ["ev11"]},
                            {"id": "se2", "ev": ["ev11"], "ev2": ["ev11", "ev12"]},
                            {"id": "se3", "ev": ["ev21"], "ev2": ["ev11", "ev12"]},
                            {"id": "se4", "ev": ["ev21", "ev22"], "ev2": ["ev21", "ev22"]}],
                           schema=schema)

Which gives me:

df.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se1|[ev11, ev12]|      [ev11]|
|se2|      [ev11]|[ev11, ev12]|
|se3|      [ev21]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+

I want to create a new column of boolean (or select only the true cases) for the rows where the contents of the "ev" column are inside the "ev2" column, returning:

df_target.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se2|      [ev11]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+

or:

df_target.show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

I tried using the isin method:

df.withColumn('evInEv2', df['ev'].isin(df['ev2'])).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|  false|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

But it looks like it only checks if it's the same array.

I also tried the array_contains function from pyspark.sql.functions but only accepts one object and not an array to check.

I am having difficulties even searching for this due to phrasing the correct problem.

Thanks!

Upvotes: 5

Views: 18063

Answers (4)

abiratsis
abiratsis

Reputation: 7336

One more implementation for Spark >= 2.4.0 avoiding UDF and using the built-in array_except:

from pyspark.sql.functions import size, array_except

def is_subset(a, b):
  return size(array_except(a, b)) == 0
  
df.withColumn("is_subset", is_subset(df.ev, df.ev2))

Output:

+---+------------+------------+---------+
| id|          ev|         ev2|is_subset|
+---+------------+------------+---------+
|se1|[ev11, ev12]|      [ev11]|    false|
|se2|      [ev11]|[ev11, ev12]|     true|
|se3|      [ev21]|[ev11, ev12]|    false|
|se4|[ev21, ev22]|[ev21, ev22]|     true|
+---+------------+------------+---------+

Upvotes: 11

information_interchange
information_interchange

Reputation: 3128

I created a Spark UDF:

from pyspark.sql.types import BooleanType
antecedent_inside_predictions = udf(lambda antecedent,prediction: all(elem in prediction for elem in antecedent), BooleanType()) 

and then use it in a join as so:

fp_predictions =  filtered_rules.join(personal_item_recos,antecedent_inside_predictions("antecedent", "item_predictions") )

Note that I needed to enable crossJoins:

spark.conf.set('spark.sql.crossJoin.enabled', True)

(Finally, I extract the particular item I want from the item as follows:

fp_predictions = fp_predictions.withColumn("ITEM_SK", fp_predictions.consequent.getItem(0))

Upvotes: 0

PSAfrance
PSAfrance

Reputation: 11

Alternatively, you can use

subsetOf=udf(lambda A,B: set(A).issubset(set(B)))
df.withColumn("evInEv2", subsetOf(df.ev,df.ev2)).show()

Upvotes: 1

mtoto
mtoto

Reputation: 24198

Here's an option using a udf, where we check the length of the difference between the columns ev and ev2. When the length of the resulting array is 0 , or all elements of ev are contained within ev2, we return True; otherwise False.

def contains(x,y):
  z = len(set(x) - set(y))
  if z == 0:
    return True
  else:
    return False

contains_udf = udf(contains)
df.withColumn("evInEv2", contains_udf(df.ev,df.ev2)).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

Upvotes: 6

Related Questions