user1038445
user1038445

Reputation: 1

pyspark reduceByKey only 1 value per key

From multiple csv files with 150 attribute per line in average, I need to satisfy this SQL request in pyspark :

SELECT object_id, COUNT(*), MAX(source_id), MIN(ra), MAX(ra), MIN(decl), MAX(decl)

I used one map function that takes each line, filter and outputs the required fields :

Map Output : < object_id , Array( objectid , 1,sourceid,ra,decl] ) >

I used one reduce function that computes all of the required aggregated functions at once (A and B are arrays as described in the map output):

def generalReduce(A,B):
    myarrayRet = [0,0,0,0,0,0,0]
    myarrayRet[0] = A[0]
    #count
    myarrayRet[1] = A[1] + B[1]
    #maxSrcId
    myarrayRet[2] = A[2] if A[2] > B[2] else B[2]
    #minRa
    myarrayRet[3] = A[3] if A[3] < B[3] else B[3]
    #maxRa
    myarrayRet[4] = A[3] if A[3] > B[3] else B[3]
    #minDecl
    myarrayRet[5] = A[4] if A[4] > B[4] else B[4]
    #maxDecl
    myarrayRet[6] = A[4] if A[4] > B[4] else B[4]
    return myarrayRet

Problem is , that there are some keys which only have 1 value, so the reduce phase is outputting an array of 4 positions. This let me think that the reduce function is only called if the key has more than 1 value, am I wrong ?; if not, how can I do to output a custom value if there is only one value per Key ?

Thank you.

Upvotes: 0

Views: 514

Answers (1)

Enrique GR
Enrique GR

Reputation: 149

I am not sure if I am understanding your problem correctly. If you have a Key-Value RDD like data like below:

Map Output : < object_id , Array( objectid , 1,sourceid,ra,decl] ) >

And use data.reduceByKey(generalReduce), your function generalReduce should be commutative and associative. That is, if you have three elements, generalReduce(generalReduce(elem1,elem2),elem3) should be equal to generalReduce(elem1,generalReduce(elem2,elem3). In your code, the returning value of generalReduce(elem1,elem2) is not the same type as elem3 so you should take that into account. In fact, I don't think your code is doing what you intend it to do.

For your second question, in the case that this is what you want to do, you could use a map for converting the values in the correct format.

Upvotes: 1

Related Questions