Marko Bonaci
Marko Bonaci

Reputation: 5706

What's the most efficient way to filter a DataFrame

... by checking whether a columns' value is in a seq.
Perhaps I'm not explaining it very well, I basically want this (to express it using regular SQL): DF_Column IN seq?

First I did it using a broadcast var (where I placed the seq), UDF (that did the checking) and registerTempTable.
The problem is that I didn't get to test it since I ran into a known bug that apparently only appears when using registerTempTable with ScalaIDE.

I ended up creating a new DataFrame out of seq and doing inner join with it (intersection), but I doubt that's the most performant way of accomplishing the task.

Thanks

EDIT: (in response to @YijieShen):
How to do filter based on whether elements of one DataFrame's column are in another DF's column (like SQL select * from A where login in (select username from B))?

E.g: First DF:

login      count
login1     192  
login2     146  
login3     72   

Second DF:

username
login2
login3
login4

The result:

login      count
login2     146  
login3     72   

Attempts:
EDIT-2: I think, now that the bug is fixed, these should work. END EDIT-2

ordered.select("login").filter($"login".contains(empLogins("username")))

and

ordered.select("login").filter($"login" in empLogins("username"))

which both throw Exception in thread "main" org.apache.spark.sql.AnalysisException, respectively:

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter Contains(login#8, username#10);

and

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter login#8 IN (username#10);

Upvotes: 31

Views: 30710

Answers (2)

Iulian Dragos
Iulian Dragos

Reputation: 5712

  1. You should broadcast a Set, instead of an Array, much faster searches than linear.

  2. You can make Eclipse run your Spark application. Here's how:

As pointed out on the mailing list, spark-sql assumes its classes are loaded by the primordial classloader. That's not the case in Eclipse, were the Java and Scala library are loaded as part of the boot classpath, while the user code and its dependencies are in another one. You can easily fix that in the launch configuration dialog:

  • remove Scala Library and Scala Compiler from the "Bootstrap" entries
  • add (as external jars) scala-reflect, scala-library and scala-compiler to the user entry.

The dialog should look like this:

enter image description here

Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)

Upvotes: 12

yjshen
yjshen

Reputation: 6693

My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT on these two configurations:

  • Intellij IDEA's test
  • Spark Standalone cluster with 8 nodes (1 master, 7 worker)

Please check if any differences exists

val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")

val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))

xdf.show()
filtered.show()

Output

name cnt
login1 192
login2 146
login3 72

name cnt
login3 72

Upvotes: 16

Related Questions