vsam490
vsam490

Reputation: 89

spark filtering not working

I'm trying to filter some values on my Dataframe[Row]. The problem is the following:

var index : Int = 0
var set = SetBuilding(features, 3)
val soglia : Int = 30
var exit : Boolean = false

while (!exit && index<set.length){
 val attributes = set(index).split(",")
 var r = scala.util.Random
 var i = r.nextInt(attributes.length)
 var previousI = i
 var j : Int = 8
 var maxprojections : Int = 5 
 var dataframe = sqlContext.sql("SELECT "+set(index)+" FROM table").cache()
println("**************VALUTAZIONE SOTTOINSIEME: "+ set(index)+"***********************" )

 while ( j!=0 && maxprojections >=0){

var filtered = dataframe.filter((elem : Row) => ReturnType(elem.get(elem.fieldIndex(attributes(i)))).>(soglia))
println ( "proiezione su attributo " + attributes (i))
for (elem <- filtered){
  println(elem)
}
if ( attributes.size != 1){
 do{
   i = r.nextInt(attributes.length)
 }while ( i == previousI )
}
     println ( "*********valore di previousI = "+ attributes(previousI)+ "******************************")

 previousI = i

 j = filtered.count().toInt   
 println ( "*********valore di j = "+ j+ "******************************")
 maxprojections = maxprojections - 1
 println ( "*********valore di maxproj = "+ maxprojections+ "******************************")
}
 index+=1
 if ( index >= 4)
   exit = true
}

The problem is that if I maintain my data structure as a spark.DataFrame and I call filter(),on some attributes I expect an empty dataframe to be returned but calling filteredData.count the value is != 0, but I'm sure that those values are less then threshold.

The problem does not occurs when I call collect() on filteredData.

Is there a solution that includes maintaining filteredData as a DataFrame?

I Hope now the query is well posed

Upvotes: 2

Views: 5707

Answers (1)

maasg
maasg

Reputation: 37435

It seems that the expection in the context of the question is that each filter operation would mutate the Dataframe effectively removing elements from it.

That's an incorrect assumption.

Each dataframe represents an immutable set of data. On each iteration of the loop, we are obtaining a new dataframe that's the result of filtering using a single condition. So at each loop the result looks like:

/** loop 1 **/ var filtered = dataframe.filter(attibute_1 > threshold)
/** loop 2 **/ var filtered = dataframe.filter(attibute_2 > threshold)
...
/** loop n **/ var filtered = dataframe.filter(attibute_n > threshold)

What we are observing at the end is the result of the final filter operation overwriting the variable var filtered = dataframe.filter(attibute_n > threshold) All other filter operations are lost.

If we would want to iteratively remove elements in this context, the quick change to achieve that would be to stack the filters in the loop by exploiting the mutable variable. Like this:

var filtered = dataframe
while (cond) {
    filtered = filtered.filter(attibute_i > threshold)
}

Here the resulting filtered dataframe has a logical plan composed of the several filter operations following each other. It will be equivalent to: dataframe.filter(attibute1 > threshold).filter(attribute2 > threshold).filter(attribute3 > threshold)...

Upvotes: 3

Related Questions