Essex
Essex

Reputation: 6118

Pyspark : RDD and "WHERE" operation

I'm learning How to handle Spark RDD with Python and I don't find solution according to rdd.filter() with where condition.

I have a CSV file which looks like this :

id,firstname,city,age,job,salary,childen,awards
1, Yves, OLS-ET-RINHODES, 55, Pilote de chasse, 3395, 3, 3
2, Paul, MARTOT, 32, Pilote d'helicoptere, 2222, 4, 5
3, Steve, DIEULEFIT, 53, Navigateur aerien, 2152, 3, 2
4, Valentin, FEUILLADE, 27, Pilote de chasse, 1776, 0, 2
...

And this is my python script :

#!/usr/bin/python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

#Context properties
conf = SparkConf().setAppName("Aeroport")
sc = SparkContext(conf=conf)

#Data Reading
data = sc.textFile("hdfs://master:9000/testfile.csv")

#Split each column
dataset = data.map(lambda l: l.split(','))

#Search children number by city
nbChildByCity = dataset.map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)

print "Nombre enfant par ville naissance : " + str(nbChildByCity.collect())

#Search children number by city with father > 50 years old
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
#nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1]) 

print "Nombre enfant par ville naissance avec père > 50 ans : " + str(nbChildByCityFather.collect()) 

My issue is : #Search children number by city with father > 50 years old

I don't overcome to add the last condition : father > 50 years old. How I have to write where condition into RDD ?

I tried this :

nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1]) 

But none result ..

Upvotes: 0

Views: 310

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You should filter first before you apply the reduceByKey

nbChildByCityFather = dataset.filter(lambda row : int(row[3].strip()) > 50).map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance avec pere > 50 ans : " + str(nbChildByCityFather.collect())

Note: this method only works if you remove the header line from the csv file or somehow filter it.

Upvotes: 1

ernest_k
ernest_k

Reputation: 45309

It's easier and more efficiently implemented using the data frame API (see alternative approach at the bottom).

To get the number of entries where age in row is over 50, you first need to filter. You also need to use the age column (index 6) in your reduce call:

Number of children by city:

nbChildByCity = data.map(lambda row : (row[2], int(row[6].strip()))) 
                     #note that it's using child count, not 1

nbChildByCity.collect()

Outputs:

[(' OLS-ET-RINHODES', 3), (' MARTOT', 4), (' DIEULEFIT', 3), (' FEUILLADE', 0)]

Same, but wi:

nbChildByCity50 = rdd.filter(lambda l: int(l[3]) > 50 )\
                     .map(lambda row : (row[2], int(row[6].strip()) ))\
                     .reduceByKey(lambda a,b:a+b)
print("Nombre enfant par ville naissance :" + str(nbChildByCity50.collect()))

Outputs:

Nombre enfant par ville naissance :[(' OLS-ET-RINHODES', 3), (' DIEULEFIT', 3)]



Note that it's easier and more appropriate to do this using the data frame API:

df = spark.read.csv('cities.csv', header=True, inferSchema=True)
grp = df.groupBy(['city'])
grp.sum('childen').show()

Which gives:

+----------------+------------+
|            city|sum(childen)|
+----------------+------------+
|       FEUILLADE|         0.0|
|          MARTOT|         4.0|
|       DIEULEFIT|         3.0|
| OLS-ET-RINHODES|         3.0|
+----------------+------------+

And with a filter by age:

grp = df.where('age > 50').groupBy(['city'])
grp.sum('childen').show()

Which outputs:

+----------------+------------+
|            city|sum(childen)|
+----------------+------------+
|       DIEULEFIT|         3.0|
| OLS-ET-RINHODES|         3.0|
+----------------+------------+

Upvotes: 2

Related Questions