Reputation: 6118
I'm learning How to handle Spark RDD
with Python and I don't find solution according to rdd.filter()
with where
condition.
I have a CSV file which looks like this :
id,firstname,city,age,job,salary,childen,awards
1, Yves, OLS-ET-RINHODES, 55, Pilote de chasse, 3395, 3, 3
2, Paul, MARTOT, 32, Pilote d'helicoptere, 2222, 4, 5
3, Steve, DIEULEFIT, 53, Navigateur aerien, 2152, 3, 2
4, Valentin, FEUILLADE, 27, Pilote de chasse, 1776, 0, 2
...
And this is my python script :
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
#Context properties
conf = SparkConf().setAppName("Aeroport")
sc = SparkContext(conf=conf)
#Data Reading
data = sc.textFile("hdfs://master:9000/testfile.csv")
#Split each column
dataset = data.map(lambda l: l.split(','))
#Search children number by city
nbChildByCity = dataset.map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance : " + str(nbChildByCity.collect())
#Search children number by city with father > 50 years old
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
#nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1])
print "Nombre enfant par ville naissance avec père > 50 ans : " + str(nbChildByCityFather.collect())
My issue is : #Search children number by city with father > 50 years old
I don't overcome to add the last condition : father > 50 years old
. How I have to write where
condition into RDD ?
I tried this :
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1])
But none result ..
Upvotes: 0
Views: 310
Reputation: 41957
You should filter
first before you apply the reduceByKey
nbChildByCityFather = dataset.filter(lambda row : int(row[3].strip()) > 50).map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance avec pere > 50 ans : " + str(nbChildByCityFather.collect())
Note: this method only works if you remove the header line from the csv file or somehow filter it.
Upvotes: 1
Reputation: 45309
It's easier and more efficiently implemented using the data frame API (see alternative approach at the bottom).
To get the number of entries where age in row is over 50, you first need to filter. You also need to use the age column (index 6) in your reduce
call:
Number of children by city:
nbChildByCity = data.map(lambda row : (row[2], int(row[6].strip())))
#note that it's using child count, not 1
nbChildByCity.collect()
Outputs:
[(' OLS-ET-RINHODES', 3), (' MARTOT', 4), (' DIEULEFIT', 3), (' FEUILLADE', 0)]
Same, but wi:
nbChildByCity50 = rdd.filter(lambda l: int(l[3]) > 50 )\
.map(lambda row : (row[2], int(row[6].strip()) ))\
.reduceByKey(lambda a,b:a+b)
print("Nombre enfant par ville naissance :" + str(nbChildByCity50.collect()))
Outputs:
Nombre enfant par ville naissance :[(' OLS-ET-RINHODES', 3), (' DIEULEFIT', 3)]
Note that it's easier and more appropriate to do this using the data frame API:
df = spark.read.csv('cities.csv', header=True, inferSchema=True)
grp = df.groupBy(['city'])
grp.sum('childen').show()
Which gives:
+----------------+------------+
| city|sum(childen)|
+----------------+------------+
| FEUILLADE| 0.0|
| MARTOT| 4.0|
| DIEULEFIT| 3.0|
| OLS-ET-RINHODES| 3.0|
+----------------+------------+
And with a filter by age:
grp = df.where('age > 50').groupBy(['city'])
grp.sum('childen').show()
Which outputs:
+----------------+------------+
| city|sum(childen)|
+----------------+------------+
| DIEULEFIT| 3.0|
| OLS-ET-RINHODES| 3.0|
+----------------+------------+
Upvotes: 2