Oriol Nieto
Oriol Nieto

Reputation: 5629

Join two RDDs, one of which has only keys and no values

Given two large RDDs, a with a set of (key, value) pairs and b with only keys, what would be the best way to join them such that a keeps only those rows that match the keys of b?

More specifically, this is what I want to do:

val a: RDD[(Int, Double)] = ...
val b: RDD[Int] = ...
val c: RDD[(Int, Double)] = a.myFilterJoin(b)

where c contains only the rows of a that match the keys in b, and we can assume that a contains only unique keys. Is there anything like myFilterJoin available?

Note that if b would be small enough, I could simply broadcast it as a set and then use it as a filter on b. But let's assume b is large enough for this broadcast to be prohibitively expensive.

What I usually do is to add a dummy variable to b such that b gets the form of (key, dummy) to be able to do the join, and then I remove the dummy variable in a map. But this seems quite hacky and I was wondering if there's a better solution.

Upvotes: 0

Views: 759

Answers (2)

Similar to ShemTov's answer but preserving type safety by using Datasets instead of DataFrames.
(PS: I would recommend you to just use Datasets instead of RDDs)

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val a = sc.parallelize(List((1 -> 0.0), (2 -> 3.3), (3 -> 5.5), (5 -> 10.11)))
val b = sc.parallelize(List(2, 3, 4, 5))

val c = b.toDS.joinWith(a.toDS, $"value" === $"_1", "inner").map {
  case (_, (key, value)) => key -> value
}.rdd

Upvotes: 1

ShemTov
ShemTov

Reputation: 707

it sound like you should use inner join:

  import spark.implicits._

  val a: DataFrame = spark.sparkContext.parallelize(Seq((1, 2.5), (2, 4.4), (3, 2.1))).toDF("keyA", "value")
  val b: DataFrame = spark.sparkContext.parallelize(Seq(3, 5, 1)).toDF("keyB")

  val c = a.join(b, $"keyA" === $"keyB", "inner").drop("keyB")

  c.show()

+----+-----+
|keyA|value|
+----+-----+
|   1|  2.5|
|   3|  2.1|
+----+-----+

and if you want return c to be RDD[(Int,Double)], you can use:

  val d = c.rdd.map(row => (row.get(0).asInstanceOf[Int], row.get(1).asInstanceOf[Double]))

Upvotes: 2

Related Questions