Reputation: 89
I'm trying to filter some values on my Dataframe[Row]. The problem is the following:
var index : Int = 0
var set = SetBuilding(features, 3)
val soglia : Int = 30
var exit : Boolean = false
while (!exit && index<set.length){
val attributes = set(index).split(",")
var r = scala.util.Random
var i = r.nextInt(attributes.length)
var previousI = i
var j : Int = 8
var maxprojections : Int = 5
var dataframe = sqlContext.sql("SELECT "+set(index)+" FROM table").cache()
println("**************VALUTAZIONE SOTTOINSIEME: "+ set(index)+"***********************" )
while ( j!=0 && maxprojections >=0){
var filtered = dataframe.filter((elem : Row) => ReturnType(elem.get(elem.fieldIndex(attributes(i)))).>(soglia))
println ( "proiezione su attributo " + attributes (i))
for (elem <- filtered){
println(elem)
}
if ( attributes.size != 1){
do{
i = r.nextInt(attributes.length)
}while ( i == previousI )
}
println ( "*********valore di previousI = "+ attributes(previousI)+ "******************************")
previousI = i
j = filtered.count().toInt
println ( "*********valore di j = "+ j+ "******************************")
maxprojections = maxprojections - 1
println ( "*********valore di maxproj = "+ maxprojections+ "******************************")
}
index+=1
if ( index >= 4)
exit = true
}
The problem is that if I maintain my data structure as a spark.DataFrame and I call filter(),on some attributes I expect an empty dataframe to be returned but calling filteredData.count the value is != 0, but I'm sure that those values are less then threshold.
The problem does not occurs when I call collect() on filteredData.
Is there a solution that includes maintaining filteredData as a DataFrame?
I Hope now the query is well posed
Upvotes: 2
Views: 5707
Reputation: 37435
It seems that the expection in the context of the question is that each filter
operation would mutate the Dataframe
effectively removing elements from it.
That's an incorrect assumption.
Each dataframe represents an immutable set of data. On each iteration of the loop, we are obtaining a new dataframe that's the result of filtering using a single condition. So at each loop the result looks like:
/** loop 1 **/ var filtered = dataframe.filter(attibute_1 > threshold)
/** loop 2 **/ var filtered = dataframe.filter(attibute_2 > threshold)
...
/** loop n **/ var filtered = dataframe.filter(attibute_n > threshold)
What we are observing at the end is the result of the final filter operation overwriting the variable var filtered = dataframe.filter(attibute_n > threshold)
All other filter operations are lost.
If we would want to iteratively remove elements in this context, the quick change to achieve that would be to stack the filters in the loop by exploiting the mutable variable. Like this:
var filtered = dataframe
while (cond) {
filtered = filtered.filter(attibute_i > threshold)
}
Here the resulting filtered
dataframe has a logical plan composed of the several filter operations following each other.
It will be equivalent to: dataframe.filter(attibute1 > threshold).filter(attribute2 > threshold).filter(attribute3 > threshold)...
Upvotes: 3