Reputation: 335
I want to filter the dataset only to contain the record which can be found in MySQL.
Here is the Dataset:
dataset.show()
+---+-----+
| id| name|
+---+-----+
| 1| a|
| 2| b|
| 3| c|
+---+-----+
And here is the table in MySQL:
+---+-----+
| id| name|
+---+-----+
| 1| a|
| 3| c|
| 4| d|
+---+-----+
This is my code (running in spark-shell):
import java.util.Properties
case class App(id: Int, name: String)
val data = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c")))
val dataFrame = data.map { case (id, name) => App(id, name) }.toDF
val dataset = dataFrame.as[App]
val url = "jdbc:mysql://ip:port/tbl_name"
val table = "my_tbl_name"
val user = "my_user_name"
val password = "my_password"
val properties = new Properties()
properties.setProperty("user", user)
properties.setProperty("password", password)
dataset.filter((x: App) =>
0 != sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count).show()
But I get "java.lang.NullPointerException"
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)
I have tested
val x = App(1, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count
val y = App(5, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + y.id.toString), properties).count
and I can get the right result 1 and 0.
What's the problem with filter?
Upvotes: 3
Views: 872
Reputation: 330423
What's the problem with filter?
You get an exception because you're trying to execute an action (count
on a DataFrame
) inside a transformation (filter
). Neither nested actions nor transformations are supported in Spark.
Correct solution is as usual either join
on compatible data structures, lookup using local data structure or query directly against external system (without using Spark data structures).
Upvotes: 3