PixieDev
PixieDev

Reputation: 307

How map Key/Value pairs between two separate RDDs?

Still a beginner in Scala and Spark, I think I'm just being brainless here. I have two RDDs, one of the type :-

((String, String), Int) = ((" v67430612_serv78i"," fb_201906266952256"),1)

Other of the type :-

(String, String, String) = (r316079113_serv60i,fb_100007609418328,-795000)

As it can be seen, the first two columns of the two RDDs are of the same format. Basically they are ID's, one is 'tid' and the other is 'uid'.

The question is this :

Is there a method by which I can compare the two RDDs in such a manner that the tid and uid are matched in both and all the data for the same matching ids is displayed in a single row without any repetitions?

Eg : If I get a match of tid and uid between the two RDDs

((String, String), Int) = ((" v67430612_serv78i"," fb_201906266952256"),1)

(String, String, String) = (" v67430612_serv78i"," fb_201906266952256",-795000)

Then the output is:-

((" v67430612_serv78i"," fb_201906266952256",-795000),1)

The IDs in the two RDDs are not in any fixed order. They are random i.e the same uid and tid serial number may not correspond in both the RDDs.

Also, how will the solution change if the first RDD type stays the same but the second RDD changes to type :-

((String, String, String), Int) = ((daily_reward_android_5.76,fb_193055751144610,81000),1)

I have to do this without the use of Spark SQL.

Upvotes: 0

Views: 2133

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

I would suggest you to convert your rdds to dataframes and apply join for easiness.

Your first dataframe should be

+------------------+-------------------+-----+
|tid               |uid                |count|
+------------------+-------------------+-----+
| v67430612_serv78i| fb_201906266952256|1    |
+------------------+-------------------+-----+

The second dataframe should be

+------------------+-------------------+-------+
|tid               |uid                |amount |
+------------------+-------------------+-------+
| v67430612_serv78i| fb_201906266952256|-795000|
+------------------+-------------------+-------+

Then getting the final output is just inner join as

df2.join(df1, Seq("tid", "uid"))

which will give output as

+------------------+-------------------+-------+-----+
|tid               |uid                |amount |count|
+------------------+-------------------+-------+-----+
| v67430612_serv78i| fb_201906266952256|-795000|1    |
+------------------+-------------------+-------+-----+

Edited

If you want to do it without dataframe/spark sql then there is join in rdd way too but you will have to modify as below

rdd2.map(x => ((x._1, x._2), x._3)).join(rdd1).map(y => ((y._1._1, y._1._2, y._2._1), y._2._2)) 

This will work only if you have rdd1 and rdd2 as defined in your question as ((" v67430612_serv78i"," fb_201906266952256"),1) and (" v67430612_serv78i"," fb_201906266952256",-795000) respectively. you should have final output as

(( v67430612_serv78i, fb_201906266952256,-795000),1)

Make sure that you trim the values for empty spaces. This will help you to be sure that both rdds have same values for key while joining, otherwise you might get an empty result.

Upvotes: 2

Related Questions