Caizheng Liu
Caizheng Liu

Reputation: 65

how to distinct the spark rdd by the key?

now, I have a RDD, which the records in the RDD are as follows:

key1  value1
key1  value2
key2  value3
key3  value4
key3  value5

I want to get the RDD records which have different keys ,as follows:

key1  value1
key2  value3
key3  value4

I can just use the spark-core APIs and don't aggregate values of the same key.

Upvotes: 0

Views: 7755

Answers (3)

Fedo
Fedo

Reputation: 69

Another option. It's in PySpark, but I'm almost sure that there should be a similar way in Scala.

Assuming again that you have a RDD with (key, value) elements, then

The short answer is,

    rdd.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))

The full working code sample is,

    from pyspark import SparkContext
    SparkContext._active_spark_context.stop()
    spConf = SparkConf()
    spConf.setAppName('unique_keys')
    sc = SparkContext(conf=spConf)

    sample_data = sc.parallelize([('k1','v1'),('k1','v2'),('k2','v3'),('k3','v4'),('k3','v5')],3)
    print('original rdd {}'.format(sorted(sample_data.collect(),key = lambda t: t[0])))
    print('original rdd has {} unique elements'.format(sample_data.distinct().count()))
    print('original rdd has {} unique keys'.format(sample_data.map(lambda t: t[0]).distinct().count()))

    sample_data = sample_data.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
    print('rdd with unique keys {}'.format(sorted(sample_data.collect()),key = lambda t: t[0]))

Output,

original rdd [('k1', 'v1'), ('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4'), ('k3', 'v5')]
original rdd has 5 unique elements
original rdd has 3 unique keys
rdd with unique keys [('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4')]

Upvotes: 0

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

You could do this with PairRDDFunctions.reduceByKey. Assuming you have an RDD[(K, V)]:

rdd.reduceByKey((a, b) => if (someCondition) a else b)

Upvotes: 3

61e74392
61e74392

Reputation: 11

With data frames and collect_set:

sqlContext.createDataFrame(rdd).toDF("k", "v")
  .groupBy("k")
  .agg(collect_set(col("v")))

Upvotes: 1

Related Questions