Neuronix
Neuronix

Reputation: 65

How generate unique pairs of values in PySpark

I have a pyspark dataframe as:

+--------+------+
|numbers1|words1|
+--------+------+
|       1| word1|
|       1| word2|
|       1| word3|
|       2| word4|
|       2| word5|
|       3| word6|
|       3| word7|
|       3| word8|
|       3| word9|
+--------+------+

I want to produce another dataframe that would generate all pairs of words in each group. So the result for the above would be:

ID     wordA    wordB
1      word1    word2
1      word1    word3
1      word2    word3
2      word4    word5
3      word6    word7
3      word6    word8
3      word6    word9
3      word7    word8
3      word7    word9
3      word8    word9

I know I can run this with Python with these codes:

from itertools import combinations
ndf = df.groupby('ID')['words'].apply(lambda x : list(combinations(x.values,2)))
                          .apply(pd.Series).stack().reset_index(level=0,name='words')

But now I need to implement this with just Spark APIs and without itertools library. How can I rewrite this script without combinations and using dataframe or RDD?

Upvotes: 0

Views: 995

Answers (2)

Steven
Steven

Reputation: 15258

Here is a solution using combinations in a UDF. It uses the same logic as the Pandas code you showed.


from itertools import combinations
from pyspark.sql import types as T, functions as F

df_agg = df.groupBy("numbers1").agg(F.collect_list("words1").alias("words_list"))

@F.udf(
    T.ArrayType(
        T.StructType(
            [
                T.StructField("wordA", T.StringType(), True,),
                T.StructField("wordB", T.StringType(), True,),
            ]
        )
    )
)
def combi(words_list):
    return list(combinations(words_list, 2))

df_agg = df_agg.withColumn("combinations", combi(F.col("words_list")))

new_df = df_agg.withColumn("combination", F.explode("combinations")).select(
    "numbers1",
    F.col("combination.wordA").alias("wordA"),
    F.col("combination.wordB").alias("wordB"),
)

new_df.show()
+--------+------+------+
|numbers1| wordA| wordB|
+--------+------+------+
|       1| word1| word2|
|       1| word1| word3|
|       1| word2| word3|
|       3| word6| word7|
|       3| word6| word8|
|       3| word6| word9|
|       3| word7| word8|
|       3| word7| word9|
|       3| word8| word9|
|       2| word4| word5|
+--------+------+------+

Upvotes: 0

Lamanus
Lamanus

Reputation: 13581

Here is my trial with the dataframe.

import pyspark.sql.functions as f

df.join(df.withColumnRenamed('words1', 'words2'), ['numbers1'], 'outer') \
  .filter('words1 < words2').show(10, False)

+--------+------+------+
|numbers1|words1|words2|
+--------+------+------+
|1       |word1 |word3 |
|1       |word1 |word2 |
|1       |word2 |word3 |
|2       |word4 |word5 |
|3       |word6 |word9 |
|3       |word6 |word8 |
|3       |word6 |word7 |
|3       |word7 |word9 |
|3       |word7 |word8 |
|3       |word8 |word9 |
+--------+------+------+

Upvotes: 1

Related Questions