Reputation: 71
I am looking to get the intersection of two RDDs in pyspark
. They look like the following:
rdd1 = sc.parallelize(["abc","def", "ghi"])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"])
Is it possible using pyspark's rdd operators to get:
intersection_rdd --> ["abc","123"] ["ghi","678"]
Upvotes: 1
Views: 529
Reputation: 756
You will try this to solve your problem:
rdd1 = sc.parallelize([[x] for x in ["abc","def", "ghi"]])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"]])
df1 = rdd1.toDF(['key'])
df2 = rdd2.toDF(['key', 'value'])
intersect = df1.join(df2, 'key').orderBy('key')
intersect.show()
Output:
+---+-----+
|key|value|
+---+-----+
|abc| 123|
|ghi| 678|
+---+-----+
Upvotes: 1
Reputation: 3254
A quick approach via PySpark RDD is to use join
but note that it requires the two RDDs to be the same size. To do this, we will start with your example below
rdd1 = sc.parallelize([["abc"],["def"], ["ghi"]])
rdd2 = sc.parallelize([["abc", 123],["df", 345], ["ghi", 678]])
You can then create rdd1a
so its the same size as rdd2
.
rdd1a = rdd1.map(lambda x: (x[0], 1))
And then you can run join
:
rdd1a.join(rdd2).map(lambda x: (x[0], x[1][1])).collect()
## Out[25]: [('abc', 123), ('ghi', 678)]
Note, this may not be the performant approach for large RDDs but its a quick and fast way for getting this out.
Another approach would be to utilize DataFrames
per below:
df1 = rdd1.toDF(['col'])
df2 = rdd2.toDF(['col', 'value'])
df_intersect = df1.join(df2, df1.col == df2.col, 'inner').select(df1.col, df2.value)
df_intersect.show()
with the output being:
+---+-----+
|col|value|
+---+-----+
|ghi| 678|
|abc| 123|
+---+-----+
Upvotes: 2