Reputation: 5629
Given two large RDDs, a
with a set of (key, value)
pairs and b
with only keys
, what would be the best way to join them such that a
keeps only those rows that match the keys of b
?
More specifically, this is what I want to do:
val a: RDD[(Int, Double)] = ...
val b: RDD[Int] = ...
val c: RDD[(Int, Double)] = a.myFilterJoin(b)
where c
contains only the rows of a
that match the keys in b
, and we can assume that a
contains only unique keys. Is there anything like myFilterJoin
available?
Note that if b
would be small enough, I could simply broadcast it as a set and then use it as a filter on b
. But let's assume b
is large enough for this broadcast to be prohibitively expensive.
What I usually do is to add a dummy variable to b
such that b
gets the form of (key, dummy)
to be able to do the join, and then I remove the dummy variable in a map. But this seems quite hacky and I was wondering if there's a better solution.
Upvotes: 0
Views: 759
Reputation: 22895
Similar to ShemTov's answer but preserving type safety by using Datasets instead of DataFrames.
(PS: I would recommend you to just use Datasets instead of RDDs)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val a = sc.parallelize(List((1 -> 0.0), (2 -> 3.3), (3 -> 5.5), (5 -> 10.11)))
val b = sc.parallelize(List(2, 3, 4, 5))
val c = b.toDS.joinWith(a.toDS, $"value" === $"_1", "inner").map {
case (_, (key, value)) => key -> value
}.rdd
Upvotes: 1
Reputation: 707
it sound like you should use inner join:
import spark.implicits._
val a: DataFrame = spark.sparkContext.parallelize(Seq((1, 2.5), (2, 4.4), (3, 2.1))).toDF("keyA", "value")
val b: DataFrame = spark.sparkContext.parallelize(Seq(3, 5, 1)).toDF("keyB")
val c = a.join(b, $"keyA" === $"keyB", "inner").drop("keyB")
c.show()
+----+-----+
|keyA|value|
+----+-----+
| 1| 2.5|
| 3| 2.1|
+----+-----+
and if you want return c to be RDD[(Int,Double)], you can use:
val d = c.rdd.map(row => (row.get(0).asInstanceOf[Int], row.get(1).asInstanceOf[Double]))
Upvotes: 2