Rekha Pujari
Rekha Pujari

Reputation: 43

How to get the set of rows which contains null values from dataframe in scala using filter

I'm new to spark and have a question regarding filtering dataframe based on null condition. I have gone through many answers which has solution like

df.filter(($"col2".isNotNULL) || ($"col2" !== "NULL")  || ($"col2" !== "null")  || ($"col2".trim !== "NULL"))

But in my case, I can not write hard coded column names as my schema is not fixed. I am reading csv file and depending upon the columns in it, I have to filter my dataframe for null values and want it in another dataframe. In short, any column which has null value, that complete row should come under a different dataframe.

for example : Input DataFrame :

+----+----+---------+---------+
|name|  id|    email|  company|
+----+----+---------+---------+
|  n1|null|[email protected]|[c1,1,d1]|
|  n2|   2|null     |[c1,1,d1]|
|  n3|   3|[email protected]| null    |
|  n4|   4|[email protected]|[c2,2,d2]|
|  n6|   6|[email protected]|[c2,2,d2]|

Output :

+----+----+---------+---------+
|name|  id|    email|  company|
+----+----+---------+---------+
|  n1|null|[email protected]|[c1,1,d1]|
|  n2|   2|null     |[c1,1,d1]|
|  n3|   3|[email protected]| null    |

Thank you in advance.

Upvotes: 3

Views: 1608

Answers (3)

Rekha Pujari
Rekha Pujari

Reputation: 43

Thank you so much for your answers. I tried below logic and it worked for me.

var arrayColumn = df.columns;
        val filterString = String.format(" %1$s is null or %1$s == '' "+ arrayColumn(0));
        val x = new StringBuilder(filterString);
        for(i <- 1 until arrayColumn.length){
          if (x.toString() != ""){
            x ++= String.format("or %1$s is null or %1$s == '' ", arrayColumn(i))
          }
        }
        val dfWithNullRows =  df.filter(x.toString());

Upvotes: 1

Chema
Chema

Reputation: 2828

To deal with null values and dataframes spark has some useful functions.

I will show some dataframes examples with distinct number of columns.

      val schema = StructType(List(StructField("id", IntegerType, true), StructField("obj",DoubleType, true)))
      val schema1 = StructType(List(StructField("id", IntegerType, true), StructField("obj",StringType, true), StructField("obj",IntegerType, true)))

      val t1 = sc.parallelize(Seq((1,null),(1,1.0),(8,3.0),(2,null),(3,1.4),(3,2.5),(null,3.7))).map(t => Row(t._1,t._2))
      val t2 = sc.parallelize(Seq((1,"A",null),(2,"B",null),(3,"C",36),(null,"D",15),(5,"E",25),(6,null,7),(7,"G",null))).map(t => Row(t._1,t._2,t._3))
      val tt1 = spark.createDataFrame(t1, schema)
      val tt2 = spark.createDataFrame(t2, schema1)

      tt1.show()
      tt2.show()

      // To clean all rows with null values
      val dfWithoutNull = tt1.na.drop()
      dfWithoutNull.show()

      val df2WithoutNull = tt2.na.drop()
      df2WithoutNull.show()

      // To fill null values with another value
      val df1 = tt1.na.fill(-1)
      df1.show()

      // to get new dataframes with the null values rows
      val nullValues = tt1.filter(row => row.anyNull == true)
      nullValues.show()

      val nullValues2 = tt2.filter(row => row.anyNull == true)
      nullValues2.show()

output

// input dataframes
+----+----+
|  id| obj|
+----+----+
|   1|null|
|   1| 1.0|
|   8| 3.0|
|   2|null|
|   3| 1.4|
|   3| 2.5|
|null| 3.7|
+----+----+

+----+----+----+
|  id| obj| obj|
+----+----+----+
|   1|   A|null|
|   2|   B|null|
|   3|   C|  36|
|null|   D|  15|
|   5|   E|  25|
|   6|null|   7|
|   7|   G|null|
+----+----+----+

// Dataframes without null values
+---+---+
| id|obj|
+---+---+
|  1|1.0|
|  8|3.0|
|  3|1.4|
|  3|2.5|
+---+---+

+---+---+---+
| id|obj|obj|
+---+---+---+
|  3|  C| 36|
|  5|  E| 25|
+---+---+---+

// Dataframe with null values replaced
+---+----+
| id| obj|
+---+----+
|  1|-1.0|
|  1| 1.0|
|  8| 3.0|
|  2|-1.0|
|  3| 1.4|
|  3| 2.5|
| -1| 3.7|
+---+----+

// Dataframes which the rows have at least one null value
+----+----+
|  id| obj|
+----+----+
|   1|null|
|   2|null|
|null| 3.7|
+----+----+

+----+----+----+
|  id| obj| obj|
+----+----+----+
|   1|   A|null|
|   2|   B|null|
|null|   D|  15|
|   6|null|   7|
|   7|   G|null|
+----+----+----+

Upvotes: 0

Som
Som

Reputation: 6323

Try this-


    val df1 = spark.sql("select col1, col2 from values (null, 1), (2, null), (null, null), (1,2) T(col1, col2)")
    /**
      * +----+----+
      * |col1|col2|
      * +----+----+
      * |null|1   |
      * |2   |null|
      * |null|null|
      * |1   |2   |
      * +----+----+
      */

    df1.show(false)
    df1.filter(df1.columns.map(col(_).isNull).reduce(_ || _)).show(false)

    /**
      * +----+----+
      * |col1|col2|
      * +----+----+
      * |null|1   |
      * |2   |null|
      * |null|null|
      * +----+----+
      */

Upvotes: 1

Related Questions