dark
dark

Reputation: 256

python spark sort elements based on value

I am new with python spark and I need your help, thanks in advance for that!

So here we go, I have this piece of script:

from datetime import datetime
from pyspark import SparkContext

def getNormalizedDate(dateOfCL):
        #the result will be in [0,1]
        dot=datetime.now()
        od=datetime.strptime("Jan 01 2010", "%b %d %Y")

        return (float((dateOfCL-od).days)/float((dot-od).days))

def addition(a, b):
        a1=a
        b1=b
        if not type(a) is float:
                a1=getNormalizedDate(a)
        if not type(b) is float:
                b1=getNormalizedDate(b)

        return float(a1+b1)

def debugFunction(x):
        print "x[0]: " + str(type(x[0]))
        print "x[1]: " + str(type(x[1])) + " --> " + str(x[1])
        return x[1]



if __name__ == '__main__':
        sc = SparkContext("local", "File Scores")

        textFile = sc.textFile("/data/spark/file.csv")
        #print "Number of lines: " + str(textFile.count())

        test1 = textFile.map(lambda line: line.split(";"))
        # result of this:
        # [u'01', u'01', u'add', u'fileName', u'Path', u'1', u'info', u'info2', u'info3', u'Sep 24 2014']

        test2 = test1.map(lambda line: (line[3], datetime.strptime(line[len(line)-1], "%b %d %Y")))

        test6=test2.reduceByKey(addition)
        #print test6
        test6.persist()

        result=sorted(test6.collect(), key=debugFunction)

This ends with an error:

Traceback (most recent call last):
  File "/data/spark/script.py", line 40, in <module>
    result=sorted(test6.collect(), key=lambda x:x[1])
TypeError: can't compare datetime.datetime to float

For info, test6.collect() gives this content

[(u'file1', 0.95606060606060606), 
(u'file2', 0.91515151515151516), 
(u'file3', 0.8797979797979798), 
(u'file4', 0.0), 
(u'file5', 0.94696969696969702), 
(u'file6', 0.95606060606060606), 
(u'file7', 0.98131313131313136), 
(u'file8', 0.86161616161616161)]

and I want to sort it based on the float value (not the key) How should proceed please?

Thank you guys.

Upvotes: 1

Views: 3086

Answers (2)

Elior Malul
Elior Malul

Reputation: 691

I for one prefer working with [DataFrames][1] over RDDs whenever possible, the API is more high-level. You can order the data on a data frame by a specific columns like so:

df = spark.read.csv('input_data.csv')
df.sort('column_name').write.csv(path='output_path')

where spark is an instance of pyspark.sql.session.SparkSession class.

Upvotes: 0

dark
dark

Reputation: 256

For those who might be interested, I found the problem. I was reducing by key, and after that performing the addition of items contained in the list of values. some of the files are unique and won't be affected by this reduction, so they will still have a date instead of a float.

what i do now is

test2 = test1.map(lambda line: (line[3], line[len(line)-1])).map(getNormalizedDate)

that will make pairs of (file, float)

only then, i reduce by key

finally, the step

result=sorted(test6.collect(), key=lamba x:x[1])

gives me the right sorting i was looking for.

I hope this helps!!

Upvotes: 1

Related Questions