Reputation: 267
I am very new with Spark, and I have a query that brings data from two Oracle tables. Such tables have to be joined by a field, which works fine with the code below. However, I need to apply filters as in an Oracle "where" clause. For example, bring employees whose age is between 25 and 50. I also have to apply GroupBy filters and sort the final results with OrderBy. The thing is that the only action that is performed correctly is the retrieval of all data from the tables and the join between them. The rest of the filters are simply not applied and I have no idea of why. Can you please help me out with this? I am sure I am missing something because NO compile errors are gotten. The data is loaded fine, but the "where" clauses seem not to be having any effect on the data, although there are Employees with age between 25 and 50. Many thanks!
public static JavaRDD<Row> getResultsFromQuery(String connectionUrl) {
JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf()
.setAppName("SparkJdbcDs").setMaster("local"));
SQLContext sqlContext = new SQLContext(sparkContext);
Map<String, String> options = new HashMap<>();
options.put("driver", "oracle.jdbc.OracleDriver");
options.put("url", connectionUrl);
options.put("dbtable", "EMPLOYEE");
DataFrameReader dataFrameReader = sqlContext.read().format("jdbc")
.options(options);
DataFrame dataFrameFirstTable = dataFrameReader.load();
options.put("dbtable", "DEPARTMENT");
dataFrameReader = sqlContext.read().format("jdbc").options(options);
DataFrame dataFrameSecondTable = dataFrameReader.load();
//JOIN. IT WORKS JUST FINE!!!
DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable,
"DEPARTMENTID");
//FILTERS. THEY DO NOT THROW ERROR, BUT ARE NOT APPLIED. RESULTS ARE ALWAYS THE SAME, WITHOUT FILTERS
resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
resultingDataFrame.where(resultingDataFrame.col("AGE").leq(50));
JavaRDD<Row> resultFromQuery = resultingDataFrame.toJavaRDD();
//HERE I CONFIRM THAT THE NUMBER OF ROWS GOTTEN IS ALWAYS THE SAME, SO THE FILTERS DO NOT WORK.
System.out.println("Number of rows "+resultFromQuery.count());
return resultFromQuery;
}
Upvotes: 2
Views: 2957
Reputation: 331
people.select("person_id", "first_name").filter(people("person_id") == 2).show
It won't work and you'll be getting the Following Error:
Error: overloaded method value filter with alternatives: (conditionExpr: String)org.apache.spark.sql.DataFrame (condition: org.apache.spark.sql.Column) org.apache.spark.sql.DataFrame cannot be applied to (Boolean)
It seems that, to work with Select clauses in Spark dataframe along with filter, we can't pass Boolean.
These two queries are used to select single row from Spark DataFrame with two different clauses, where and filter.
people.select("person_id", "first_name").filter(people("person_id") === 2).show
people.select("person_id", "first_name").where(people("person_id") === 2).show
Use one of the above query, to select single row from Spark DataFrame.
Upvotes: 1
Reputation: 67075
where
returns a new dataframe and does NOT alter the existing one, so you need to store the output:
DataFrame greaterThan25 = resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
DataFrame lessThanGreaterThan = greaterThan25.where(resultingDataFrame.col("AGE").leq(50));
JavaRDD<Row> resultFromQuery = lessThanGreaterThan.toJavaRDD();
Or you can just chain it:
DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable, "DEPARTMENTID")
.where(resultingDataFrame.col("AGE").geq(25))
.where(resultingDataFrame.col("AGE").leq(50));
Upvotes: 5