Alex Ortner
Alex Ortner

Reputation: 1238

Pyspark merge dataframe rows one array is contained in another

I don't even know what the best title to phrase the questions.

I have the following dataset

df = spark.createDataFrame([\
            (["1", "2","3","4"], ),\
            (["1","2","3"], ),\
            (["2","1","3"], ),\
            (["2","3","4","1"], ),\
            (["6","7"], )\
], ['cycle', ])
df.show()

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|   [1, 2, 3]|
|   [2, 1, 3]|
|[2, 3, 4, 1]|
|      [6, 7]|
+------------+

What I would like to have at the end is:

  1. remove the permutations
  2. keep only the row with the maximum row that contains the all other sets

I can use sort_array() and distinct() to get rid of the permutations

df.select(f.sort_array("cycle").alias("cycle")).distinct().show() 
+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
|   [1, 2, 3]|
+------------+

What I would like to reduce the dataset with Pyspark is:

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
+------------+

So check somehow that [1, 2, 3] is part of [1, 2, 3, 4] and only keep So the Python Subset command A.issubset(B) applied in the Pyspark, Spark way over a column

The only way I can currently think of is a horrible iterative loop over very row which will kill every performance

Upvotes: 3

Views: 352

Answers (1)

jxc
jxc

Reputation: 13998

One way you might try is to first find all cycles with at least one superset (exclude self) by using a self join to find d2.cycle which satisfies the following conditions:

  • size(array_except(d2.cycle, d1.cycle))==0: none items in d2.cycle are excluded in d1.cycle (EMPTY array will satisfy)
  • size(d2.cycle) < size(d1.cycle): the size of d2.cycle is less than the size of d1.cycle:

and then take a left_anti join to exclude the above list from the original dataframe, finally run sort_array and drop_duplicates(or distinct):

from pyspark.sql.functions import expr

df_sub = df.alias('d1').join(
      df.alias('d2')
    , expr('size(array_except(d2.cycle, d1.cycle))==0 AND size(d2.cycle) < size(d1.cycle)')
).select('d2.cycle').distinct()

df_sub.show()
#+---------+
#|    cycle|
#+---------+
#|[1, 2, 3]|
#|[2, 1, 3]|
#+---------+

df.join(df_sub , on=['cycle'], how='left_anti') \
  .withColumn('cycle', expr('sort_array(cycle)')) \
  .distinct() \
  .show()
#+------------+                                                                  
#|       cycle|
#+------------+
#|[1, 2, 3, 4]|
#|      [6, 7]|
#+------------+

Upvotes: 2

Related Questions