Reputation: 5389
val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14))
))
val codes = sc.parallelize(Seq(2, 3, 12, 13))
val result = data.map {case (id,values) => (id, values.diff(codes))}
I would like to get the result as:
val result: Array[(String, Array[Int])] = Array(
("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14))
)
However, when I do the set difference, I get type mismatch error.
Upvotes: 1
Views: 2785
Reputation: 330083
If you want to apply operation on a local data structure there is no reason to parallelize codes
. Just mapValues
like this:
val codes = Seq(2, 3, 12, 13)
val result = data.mapValues(_.diff(codes))
If codes won't fit into memory you'll have to do something slightly more complicated:
// Add dummy values to codes
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null))
data
.flatMapValues(x => x) // Flatten values (k, vs) => (k, v)
.map(_.swap) // Swap order => (v, k)
.subtractByKey(codes) // Difference
.map(_.swap) // Swap order => (k, v)
.groupByKey // Group => (k, vs)
Upvotes: 3