Raz
Raz

Reputation: 5

Sorting a dataframe in PySpark without sql functions

I have some trouble printing out this query with the months sorted in proper order.

Is there a pyspark function command to format the month column in descending order? (without using sql commands)

from pyspark import SparkContext
from pyspark.sql import SQLContext
from operator import add

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

rows = sc.textFile("data.csv")
data = rows.map(lambda line: line.split(","))
header = data.first()

q = data.filter(lambda line: line != header)\
        .map(lambda x:(x[1], 1))\
        .reduceByKey(add)\
        .sortByKey(True)\
        .collect()

sqlContext.createDataFrame(q, ['Month','Total number of operated flights']).show()

+-----+--------------------------------+
|Month|Total number of operated flights|
+-----+--------------------------------+
|    1|                          621559|
|   10|                          629992|
|   11|                          605149|
|   12|                          614139|
|    2|                          565604|
|    3|                          639209|
|    4|                          614648|
|    5|                          631609|
|    6|                          629280|
|    7|                          648560|
|    8|                          653279|
|    9|                          600187|
+-----+--------------------------------+

Upvotes: 0

Views: 181

Answers (1)

Dominik Filipiak
Dominik Filipiak

Reputation: 1272

That's because the month column is treated as string. There are several ways to achieve the correct order. For instance, you can cast the column to int on the RDD call and use sortByKey(False) for the descending order:

q = data.filter(lambda line: line != header)\
    .map(lambda x:(int(x[1]), 1))\
    .reduceByKey(add)\
    .sortByKey(False)\
    .collect()

I'm not quite this will match your expectations, but certainly you don't necessarily need to invoke collect() to make DF from RDD - you can achieve that by running:

df = data.filter(lambda line: line != header)\
 .map(lambda x:(int(x[1]), 1))\
 .reduceByKey(add)\
 .sortByKey(False)
 .toDF()

You can do the casting with the DataFrame API as well with df.withColumn('Month', df['Month'].as(pyspark.sql.types.StringType()), but you've stated you don't want to use Spark SQL.

Upvotes: 1

Related Questions