Reputation: 688
I have 2 spark RDD, the 1st one contains a mapping between some indices and ids which are strings and the 2nd one contains tuples of related indices
val ids = spark.sparkContext.parallelize(Array[(Int, String)](
(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"))).toDF("index", "idx")
val relationships = spark.sparkContext.parallelize(Array[(Int, Int)](
(1, 3), (2, 3), (4, 5))).toDF("index1", "index2")
I want to join somehow these RDD (or merge or sql or any best spark practice) to have at the end related ids instead:
The result of my combined RDD should return:
("a", "c"), ("b", "c"), ("d", "e")
Any idea how I can achieve this operation in an optimal way without loading any of the RDD into a memory map (because in my scenarios, these RDD can potentially load millions of records)
Upvotes: 1
Views: 366
Reputation: 4333
You can approach this by creating a two views
from DataFrame
as following
relationships.createOrReplaceTempView("relationships");
ids.createOrReplaceTempView("ids");
Next run the following SQL query to generate the required result which performs inner join between relationships
and ids
view to generate the required result
import sqlContext.sql;
val result = spark.sql("""select t.index1, id.idx from
(select id.idx as index1, rel.index2
from relationships rel
inner join
ids id on rel.index1=id.index) t
inner join
ids id
on id.index=t.index2
""");
result.show()
Another approach using DataFrame
without creating views
relationships.as("rel").
join(ids.as("ids"), $"ids.index" === $"rel.index1").as("temp").
join(ids.as("ids"), $"temp.index2"===$"ids.index").
select($"temp.idx".as("index1"), $"ids.idx".as("index2")).show
Upvotes: 2