Reputation: 256
I am new with python spark and I need your help, thanks in advance for that!
So here we go, I have this piece of script:
from datetime import datetime
from pyspark import SparkContext
def getNormalizedDate(dateOfCL):
#the result will be in [0,1]
dot=datetime.now()
od=datetime.strptime("Jan 01 2010", "%b %d %Y")
return (float((dateOfCL-od).days)/float((dot-od).days))
def addition(a, b):
a1=a
b1=b
if not type(a) is float:
a1=getNormalizedDate(a)
if not type(b) is float:
b1=getNormalizedDate(b)
return float(a1+b1)
def debugFunction(x):
print "x[0]: " + str(type(x[0]))
print "x[1]: " + str(type(x[1])) + " --> " + str(x[1])
return x[1]
if __name__ == '__main__':
sc = SparkContext("local", "File Scores")
textFile = sc.textFile("/data/spark/file.csv")
#print "Number of lines: " + str(textFile.count())
test1 = textFile.map(lambda line: line.split(";"))
# result of this:
# [u'01', u'01', u'add', u'fileName', u'Path', u'1', u'info', u'info2', u'info3', u'Sep 24 2014']
test2 = test1.map(lambda line: (line[3], datetime.strptime(line[len(line)-1], "%b %d %Y")))
test6=test2.reduceByKey(addition)
#print test6
test6.persist()
result=sorted(test6.collect(), key=debugFunction)
This ends with an error:
Traceback (most recent call last):
File "/data/spark/script.py", line 40, in <module>
result=sorted(test6.collect(), key=lambda x:x[1])
TypeError: can't compare datetime.datetime to float
For info, test6.collect() gives this content
[(u'file1', 0.95606060606060606),
(u'file2', 0.91515151515151516),
(u'file3', 0.8797979797979798),
(u'file4', 0.0),
(u'file5', 0.94696969696969702),
(u'file6', 0.95606060606060606),
(u'file7', 0.98131313131313136),
(u'file8', 0.86161616161616161)]
and I want to sort it based on the float value (not the key) How should proceed please?
Thank you guys.
Upvotes: 1
Views: 3086
Reputation: 691
I for one prefer working with [DataFrames][1] over RDDs whenever possible, the API is more high-level. You can order the data on a data frame by a specific columns like so:
df = spark.read.csv('input_data.csv')
df.sort('column_name').write.csv(path='output_path')
where spark
is an instance of pyspark.sql.session.SparkSession
class.
Upvotes: 0
Reputation: 256
For those who might be interested, I found the problem. I was reducing by key, and after that performing the addition of items contained in the list of values. some of the files are unique and won't be affected by this reduction, so they will still have a date instead of a float.
what i do now is
test2 = test1.map(lambda line: (line[3], line[len(line)-1])).map(getNormalizedDate)
that will make pairs of (file, float)
only then, i reduce by key
finally, the step
result=sorted(test6.collect(), key=lamba x:x[1])
gives me the right sorting i was looking for.
I hope this helps!!
Upvotes: 1