Stefan S
Stefan S

Reputation: 267

Spark filters are never applied to DataFrame in Java

I am very new with Spark, and I have a query that brings data from two Oracle tables. Such tables have to be joined by a field, which works fine with the code below. However, I need to apply filters as in an Oracle "where" clause. For example, bring employees whose age is between 25 and 50. I also have to apply GroupBy filters and sort the final results with OrderBy. The thing is that the only action that is performed correctly is the retrieval of all data from the tables and the join between them. The rest of the filters are simply not applied and I have no idea of why. Can you please help me out with this? I am sure I am missing something because NO compile errors are gotten. The data is loaded fine, but the "where" clauses seem not to be having any effect on the data, although there are Employees with age between 25 and 50. Many thanks!

public static JavaRDD<Row> getResultsFromQuery(String connectionUrl) {

    JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf()
            .setAppName("SparkJdbcDs").setMaster("local"));
    SQLContext sqlContext = new SQLContext(sparkContext);

    Map<String, String> options = new HashMap<>();
    options.put("driver", "oracle.jdbc.OracleDriver");
    options.put("url", connectionUrl);
    options.put("dbtable", "EMPLOYEE");

    DataFrameReader dataFrameReader = sqlContext.read().format("jdbc")
            .options(options);

    DataFrame dataFrameFirstTable = dataFrameReader.load();

    options.put("dbtable", "DEPARTMENT");

    dataFrameReader = sqlContext.read().format("jdbc").options(options);

    DataFrame dataFrameSecondTable = dataFrameReader.load();

    //JOIN. IT WORKS JUST FINE!!!

    DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable, 
            "DEPARTMENTID");


    //FILTERS. THEY DO NOT THROW ERROR, BUT ARE NOT APPLIED. RESULTS ARE ALWAYS THE SAME, WITHOUT FILTERS
    resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
    resultingDataFrame.where(resultingDataFrame.col("AGE").leq(50));

    JavaRDD<Row> resultFromQuery = resultingDataFrame.toJavaRDD();

    //HERE I CONFIRM THAT THE NUMBER OF ROWS GOTTEN IS ALWAYS THE SAME, SO THE FILTERS DO NOT WORK.
    System.out.println("Number of rows "+resultFromQuery.count());

    return resultFromQuery;

}

Upvotes: 2

Views: 2957

Answers (2)

Prabhat Jain
Prabhat Jain

Reputation: 331

people.select("person_id", "first_name").filter(people("person_id") == 2).show

It won't work and you'll be getting the Following Error:

Error: overloaded method value filter with alternatives: (conditionExpr: String)org.apache.spark.sql.DataFrame (condition: org.apache.spark.sql.Column) org.apache.spark.sql.DataFrame cannot be applied to (Boolean)

It seems that, to work with Select clauses in Spark dataframe along with filter, we can't pass Boolean.

These two queries are used to select single row from Spark DataFrame with two different clauses, where and filter.

people.select("person_id", "first_name").filter(people("person_id") === 2).show

people.select("person_id", "first_name").where(people("person_id") === 2).show

Use one of the above query, to select single row from Spark DataFrame.

Upvotes: 1

Justin Pihony
Justin Pihony

Reputation: 67075

where returns a new dataframe and does NOT alter the existing one, so you need to store the output:

DataFrame greaterThan25 = resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
DataFrame lessThanGreaterThan = greaterThan25.where(resultingDataFrame.col("AGE").leq(50));
JavaRDD<Row> resultFromQuery = lessThanGreaterThan.toJavaRDD();

Or you can just chain it:

DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable, "DEPARTMENTID")
  .where(resultingDataFrame.col("AGE").geq(25))
  .where(resultingDataFrame.col("AGE").leq(50));

Upvotes: 5

Related Questions