Reputation: 22518
I would like to keep only the employees which does have a departement ID referenced in the second table.
Employee table
LastName DepartmentID
Rafferty 31
Jones 33
Heisenberg 33
Robinson 34
Smith 34
Department table
DepartmentID
31
33
I have tried the following code which does not work:
employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()
Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
Any ideas? I am using Spark 1.1.0 with Python. However, I would accept a Scala or Python answer.
Upvotes: 14
Views: 21268
Reputation: 495
Filtering multiple values in multiple columns:
In the case where you're pulling data from a database (Hive or SQL type db for this example) and need to filter on multiple columns, it might just be easier to load the table with the first filter, then iterate your filters through the RDD (multiple small iterations is the encouraged way of Spark programming):
{
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)")
val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20")
val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500")
}
Of course you have to know your data a little bit to filter on the right values, but that's part of the analysis process.
Upvotes: 0
Reputation: 37435
In this case, what you would like to achieve is to filter at each partition with the data contained in the department table: This would be the basic solution:
val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}
If your department data is large, a broadcast variable will improve performance by delivering the data once to all the nodes instead of having to serialize it with each task
val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}
Although using join would work, it's a very expensive solution as it will require a distributed shuffle of the data (byKey) to achieve the join. Given that the requirement is a simple filter, sending the data to each partition (as shown above) will provide much better performance.
Upvotes: 26
Reputation: 22518
I finally implemented a solution using a join. I had to add a 0 value to the department to avoid an exception from Spark:
employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))
employee.join(department).map(lambda e: (e[1][0], e[0])).collect()
output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]
Upvotes: 10