Reputation: 11
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc , 10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)
Here is a small portion of the result:
[['0.02703300', '1.30900000'],
['0.02703300', '0.61800000'],
['0.02704600', '3.90800000'],
['0.02704700', '4.00000000'],
['0.02704700', '7.44600000']
And I want to get the max and minimum value for each key, how?
Upvotes: 1
Views: 3646
Reputation: 158
As @mck said, you can use reduceByKey but it can be a bit complex to understand if you have never used functional programming.
What the method does is applying a function to the resulting value of doing a groupByKey
. Let's analyze it step by step.
>>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]
Doing this we obtain an RDD with one entry per key (first column in a paired RDD) and the value is an iterable. We can think this as a list.
We got from the base RDD
[['0.02703300', '1.30900000'],
['0.02703300', '0.61800000'],
['0.02704600', '3.90800000'],
['0.02704700', '4.00000000'],
['0.02704700', '7.44600000']]
To one grouped
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>),
('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]
Then what we must do is apply the desired function over the values. We can do it passing the desired function to the mapValues
method (in my case, I pass directly a lambda function)
>>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')),
('0.02704700', ('7.44600000', '4.00000000')),
('0.02703300', ('1.30900000', '0.61800000'))]
There are some considerations:
reducebyKey
is more neat and efficient. Although it can be confusingreduceByKey
function needs to be a bit different, since it gets two items instead of an iterable>>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'),
('0.02704700', ('7.44600000', '4.00000000')),
('0.02703300', ('1.30900000', '0.61800000'))]
Upvotes: 3
Reputation: 42422
You can use reduceByKey
:
minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)
Upvotes: 2