ecako
ecako

Reputation: 71

Pyspark intersection

I am looking to get the intersection of two RDDs in pyspark. They look like the following:

rdd1 = sc.parallelize(["abc","def", "ghi"])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"])

Is it possible using pyspark's rdd operators to get:

intersection_rdd --> ["abc","123"] ["ghi","678"]

Upvotes: 1

Views: 529

Answers (2)

Rahul Gupta
Rahul Gupta

Reputation: 756

You will try this to solve your problem:

rdd1 = sc.parallelize([[x] for x in ["abc","def", "ghi"]])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"]])    
df1 = rdd1.toDF(['key'])
df2 = rdd2.toDF(['key', 'value'])
intersect = df1.join(df2, 'key').orderBy('key')
intersect.show()

Output:

+---+-----+
|key|value|
+---+-----+
|abc|  123|
|ghi|  678|
+---+-----+

Upvotes: 1

Denny Lee
Denny Lee

Reputation: 3254

A quick approach via PySpark RDD is to use join but note that it requires the two RDDs to be the same size. To do this, we will start with your example below

rdd1 = sc.parallelize([["abc"],["def"], ["ghi"]])
rdd2 = sc.parallelize([["abc", 123],["df", 345], ["ghi", 678]])

You can then create rdd1a so its the same size as rdd2.

rdd1a = rdd1.map(lambda x: (x[0], 1))

And then you can run join:

rdd1a.join(rdd2).map(lambda x: (x[0], x[1][1])).collect()
## Out[25]: [('abc', 123), ('ghi', 678)] 

Note, this may not be the performant approach for large RDDs but its a quick and fast way for getting this out.

Another approach would be to utilize DataFrames per below:

df1 = rdd1.toDF(['col'])
df2 = rdd2.toDF(['col', 'value'])
df_intersect = df1.join(df2, df1.col == df2.col, 'inner').select(df1.col, df2.value)
df_intersect.show()

with the output being:

+---+-----+
|col|value|
+---+-----+
|ghi|  678|
|abc|  123|
+---+-----+

Upvotes: 2

Related Questions