Reputation: 65
I have a pyspark dataframe as:
+--------+------+
|numbers1|words1|
+--------+------+
| 1| word1|
| 1| word2|
| 1| word3|
| 2| word4|
| 2| word5|
| 3| word6|
| 3| word7|
| 3| word8|
| 3| word9|
+--------+------+
I want to produce another dataframe that would generate all pairs of words in each group. So the result for the above would be:
ID wordA wordB
1 word1 word2
1 word1 word3
1 word2 word3
2 word4 word5
3 word6 word7
3 word6 word8
3 word6 word9
3 word7 word8
3 word7 word9
3 word8 word9
I know I can run this with Python with these codes:
from itertools import combinations
ndf = df.groupby('ID')['words'].apply(lambda x : list(combinations(x.values,2)))
.apply(pd.Series).stack().reset_index(level=0,name='words')
But now I need to implement this with just Spark APIs and without itertools library. How can I rewrite this script without combinations and using dataframe or RDD?
Upvotes: 0
Views: 995
Reputation: 15258
Here is a solution using combinations
in a UDF. It uses the same logic as the Pandas code you showed.
from itertools import combinations
from pyspark.sql import types as T, functions as F
df_agg = df.groupBy("numbers1").agg(F.collect_list("words1").alias("words_list"))
@F.udf(
T.ArrayType(
T.StructType(
[
T.StructField("wordA", T.StringType(), True,),
T.StructField("wordB", T.StringType(), True,),
]
)
)
)
def combi(words_list):
return list(combinations(words_list, 2))
df_agg = df_agg.withColumn("combinations", combi(F.col("words_list")))
new_df = df_agg.withColumn("combination", F.explode("combinations")).select(
"numbers1",
F.col("combination.wordA").alias("wordA"),
F.col("combination.wordB").alias("wordB"),
)
new_df.show()
+--------+------+------+
|numbers1| wordA| wordB|
+--------+------+------+
| 1| word1| word2|
| 1| word1| word3|
| 1| word2| word3|
| 3| word6| word7|
| 3| word6| word8|
| 3| word6| word9|
| 3| word7| word8|
| 3| word7| word9|
| 3| word8| word9|
| 2| word4| word5|
+--------+------+------+
Upvotes: 0
Reputation: 13581
Here is my trial with the dataframe.
import pyspark.sql.functions as f
df.join(df.withColumnRenamed('words1', 'words2'), ['numbers1'], 'outer') \
.filter('words1 < words2').show(10, False)
+--------+------+------+
|numbers1|words1|words2|
+--------+------+------+
|1 |word1 |word3 |
|1 |word1 |word2 |
|1 |word2 |word3 |
|2 |word4 |word5 |
|3 |word6 |word9 |
|3 |word6 |word8 |
|3 |word6 |word7 |
|3 |word7 |word9 |
|3 |word7 |word8 |
|3 |word8 |word9 |
+--------+------+------+
Upvotes: 1