Reputation: 65
now, I have a RDD, which the records in the RDD are as follows:
key1 value1
key1 value2
key2 value3
key3 value4
key3 value5
I want to get the RDD records which have different keys ,as follows:
key1 value1
key2 value3
key3 value4
I can just use the spark-core APIs and don't aggregate values of the same key.
Upvotes: 0
Views: 7755
Reputation: 69
Another option. It's in PySpark
, but I'm almost sure that there should be a similar way in Scala.
Assuming again that you have a RDD with (key, value) elements, then
The short answer is,
rdd.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
The full working code sample is,
from pyspark import SparkContext
SparkContext._active_spark_context.stop()
spConf = SparkConf()
spConf.setAppName('unique_keys')
sc = SparkContext(conf=spConf)
sample_data = sc.parallelize([('k1','v1'),('k1','v2'),('k2','v3'),('k3','v4'),('k3','v5')],3)
print('original rdd {}'.format(sorted(sample_data.collect(),key = lambda t: t[0])))
print('original rdd has {} unique elements'.format(sample_data.distinct().count()))
print('original rdd has {} unique keys'.format(sample_data.map(lambda t: t[0]).distinct().count()))
sample_data = sample_data.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
print('rdd with unique keys {}'.format(sorted(sample_data.collect()),key = lambda t: t[0]))
Output,
original rdd [('k1', 'v1'), ('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4'), ('k3', 'v5')]
original rdd has 5 unique elements
original rdd has 3 unique keys
rdd with unique keys [('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4')]
Upvotes: 0
Reputation: 149538
You could do this with PairRDDFunctions.reduceByKey
. Assuming you have an RDD[(K, V)]
:
rdd.reduceByKey((a, b) => if (someCondition) a else b)
Upvotes: 3
Reputation: 11
With data frames and collect_set
:
sqlContext.createDataFrame(rdd).toDF("k", "v")
.groupBy("k")
.agg(collect_set(col("v")))
Upvotes: 1