Reputation: 5706
... by checking whether a columns' value is in a seq
.
Perhaps I'm not explaining it very well, I basically want this (to express it using regular SQL): DF_Column IN seq
?
First I did it using a broadcast var
(where I placed the seq), UDF
(that did the checking) and registerTempTable
.
The problem is that I didn't get to test it since I ran into a known bug that apparently only appears when using registerTempTable
with ScalaIDE.
I ended up creating a new DataFrame
out of seq
and doing inner join with it (intersection), but I doubt that's the most performant way of accomplishing the task.
Thanks
EDIT: (in response to @YijieShen):
How to do filter
based on whether elements of one DataFrame
's column are in another DF's column (like SQL select * from A where login in (select username from B)
)?
E.g: First DF:
login count
login1 192
login2 146
login3 72
Second DF:
username
login2
login3
login4
The result:
login count
login2 146
login3 72
Attempts:
EDIT-2: I think, now that the bug is fixed, these should work. END EDIT-2
ordered.select("login").filter($"login".contains(empLogins("username")))
and
ordered.select("login").filter($"login" in empLogins("username"))
which both throw Exception in thread "main" org.apache.spark.sql.AnalysisException
, respectively:
resolved attribute(s) username#10 missing from login#8 in operator
!Filter Contains(login#8, username#10);
and
resolved attribute(s) username#10 missing from login#8 in operator
!Filter login#8 IN (username#10);
Upvotes: 31
Views: 30710
Reputation: 5712
You should broadcast a Set
, instead of an Array
, much faster searches than linear.
You can make Eclipse run your Spark application. Here's how:
As pointed out on the mailing list, spark-sql assumes its classes are loaded by the primordial classloader. That's not the case in Eclipse, were the Java and Scala library are loaded as part of the boot classpath, while the user code and its dependencies are in another one. You can easily fix that in the launch configuration dialog:
scala-reflect
, scala-library
and scala-compiler
to the user entry.The dialog should look like this:
Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)
Upvotes: 12
Reputation: 6693
My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT
on these two configurations:
Intellij IDEA's test
Spark Standalone cluster
with 8 nodes (1 master, 7 worker)Please check if any differences exists
val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")
val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))
xdf.show()
filtered.show()
Output
name cnt
login1 192
login2 146
login3 72name cnt
login3 72
Upvotes: 16