proximator
proximator

Reputation: 688

How to join two spark RDD

I have 2 spark RDD, the 1st one contains a mapping between some indices and ids which are strings and the 2nd one contains tuples of related indices

val ids = spark.sparkContext.parallelize(Array[(Int, String)](
      (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"))).toDF("index", "idx")


val relationships = spark.sparkContext.parallelize(Array[(Int, Int)](
  (1, 3), (2, 3), (4, 5))).toDF("index1", "index2")

I want to join somehow these RDD (or merge or sql or any best spark practice) to have at the end related ids instead:

The result of my combined RDD should return:

("a", "c"), ("b", "c"), ("d", "e")

Any idea how I can achieve this operation in an optimal way without loading any of the RDD into a memory map (because in my scenarios, these RDD can potentially load millions of records)

Upvotes: 1

Views: 366

Answers (1)

tourist
tourist

Reputation: 4333

You can approach this by creating a two views from DataFrame as following

relationships.createOrReplaceTempView("relationships");
ids.createOrReplaceTempView("ids");

Next run the following SQL query to generate the required result which performs inner join between relationships and ids view to generate the required result

import sqlContext.sql;
val result = spark.sql("""select t.index1, id.idx from 
                                (select id.idx as index1, rel.index2 
                               from relationships rel
                               inner join
                               ids id on rel.index1=id.index) t
                         inner join
                         ids id
                         on id.index=t.index2
                      """);

result.show()

Another approach using DataFrame without creating views

relationships.as("rel").
join(ids.as("ids"),  $"ids.index" === $"rel.index1").as("temp").
join(ids.as("ids"), $"temp.index2"===$"ids.index").
select($"temp.idx".as("index1"), $"ids.idx".as("index2")).show

Upvotes: 2

Related Questions