Saif Nitham
Saif Nitham

Reputation: 11

Get Max & Min value for each key in the RDD

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc , 10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)

Here is a small portion of the result:

[['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']

And I want to get the max and minimum value for each key, how?

Upvotes: 1

Views: 3646

Answers (2)

VCLL
VCLL

Reputation: 158

As @mck said, you can use reduceByKey but it can be a bit complex to understand if you have never used functional programming.

What the method does is applying a function to the resulting value of doing a groupByKey. Let's analyze it step by step.

>>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]

Doing this we obtain an RDD with one entry per key (first column in a paired RDD) and the value is an iterable. We can think this as a list.

We got from the base RDD

[['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']]

To one grouped

[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
 ('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>), 
 ('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]

Then what we must do is apply the desired function over the values. We can do it passing the desired function to the mapValues method (in my case, I pass directly a lambda function)

>>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]

There are some considerations:

  1. reducebyKey is more neat and efficient. Although it can be confusing
  2. If you want both the maximum and minimum, try to do it at the same time as I have shown (you can do it using the reduceByKey also). That way instead of doing two passes over the data you just do it once.
  3. Try to use the DataFrame (SQL) API. It is more modern and it tries to optimize computations for you.
  4. reduceByKey function needs to be a bit different, since it gets two items instead of an iterable
>>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]

Upvotes: 3

mck
mck

Reputation: 42422

You can use reduceByKey:

minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)

Upvotes: 2

Related Questions