user3803714
user3803714

Reputation: 5389

Spark RDD: set difference

val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
  ("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14))
))

val codes = sc.parallelize(Seq(2, 3, 12, 13))

val result = data.map {case (id,values) => (id, values.diff(codes))}

I would like to get the result as:

val result: Array[(String, Array[Int])] = Array(
  ("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14))
)

However, when I do the set difference, I get type mismatch error.

Upvotes: 1

Views: 2785

Answers (1)

zero323
zero323

Reputation: 330083

If you want to apply operation on a local data structure there is no reason to parallelize codes. Just mapValues like this:

val codes = Seq(2, 3, 12, 13)
val result = data.mapValues(_.diff(codes))

If codes won't fit into memory you'll have to do something slightly more complicated:

// Add dummy values to codes
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null))

data
  .flatMapValues(x => x)  // Flatten values (k, vs) => (k, v)
  .map(_.swap) // Swap order => (v, k)
  .subtractByKey(codes) // Difference
  .map(_.swap) // Swap order => (k, v)
  .groupByKey  // Group => (k, vs)

Upvotes: 3

Related Questions