Xiaoyu Chen
Xiaoyu Chen

Reputation: 335

An error about Dataset.filter in Spark SQL

I want to filter the dataset only to contain the record which can be found in MySQL.

Here is the Dataset:

dataset.show()
+---+-----+
| id| name|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+-----+

And here is the table in MySQL:

+---+-----+
| id| name|
+---+-----+
|  1|    a|
|  3|    c|
|  4|    d|
+---+-----+

This is my code (running in spark-shell):

import java.util.Properties

case class App(id: Int, name: String)

val data = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c")))
val dataFrame = data.map { case (id, name) => App(id, name) }.toDF
val dataset = dataFrame.as[App]

val url = "jdbc:mysql://ip:port/tbl_name"
val table = "my_tbl_name"
val user = "my_user_name"
val password = "my_password"

val properties = new Properties()
properties.setProperty("user", user)
properties.setProperty("password", password)

dataset.filter((x: App) => 
  0 != sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count).show()

But I get "java.lang.NullPointerException"

at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
    at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)

I have tested

val x = App(1, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count

val y = App(5, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + y.id.toString), properties).count

and I can get the right result 1 and 0.

What's the problem with filter?

Upvotes: 3

Views: 872

Answers (1)

zero323
zero323

Reputation: 330423

What's the problem with filter?

You get an exception because you're trying to execute an action (count on a DataFrame) inside a transformation (filter). Neither nested actions nor transformations are supported in Spark.

Correct solution is as usual either join on compatible data structures, lookup using local data structure or query directly against external system (without using Spark data structures).

Upvotes: 3

Related Questions