Jesus Zuñiga
Jesus Zuñiga

Reputation: 135

How can i check for empty values on spark Dataframe using User defined functions

guys, I have this user-defined function to check if the text rows are empty:

import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._
    {{{
      val df = Seq(
        (0, "","Mongo"),
        (1, "World","sql"),
        (2, "","")
        ).toDF("id", "text", "Source")

      // Define a "regular" Scala function
      val checkEmpty: String => Boolean =  x => {
        var test = false
        if(x.isEmpty){
          test = true
        }
        test
      }
      val upper = udf(checkEmpty)
      df.withColumn("isEmpty", upper('text)).show
    }}}

I'm actually getting this dataframe:

+---+-----+------+-------+
| id| text|Source|isEmpty|
+---+-----+------+-------+
|  0|     | Mongo|   true|
|  1|World|   sql|  false|
|  2|     |      |   true|
+---+-----+------+-------+

How could I check for all the rows for empty values and return a message like:

id 0 has the text column with empty values
id 2 has the text,source column with empty values

Upvotes: 2

Views: 4455

Answers (2)

pasha701
pasha701

Reputation: 7207

UDF which get nullable columns as Row can be used, for get empty column names. Then rows with non-empty columns can be filtered:

val emptyColumnList = (r: Row) => r
  .toSeq
  .zipWithIndex
  .filter(_._1.toString().isEmpty)
  .map(pair => r.schema.fields(pair._2).name)

val emptyColumnListUDF = udf(emptyColumnList)

val columnsToCheck = Seq($"text", $"Source")
val result = df
  .withColumn("EmptyColumns", emptyColumnListUDF(struct(columnsToCheck: _*)))
  .where(size($"EmptyColumns") > 0)
  .select(format_string("id %s has the %s columns with empty values", $"id", $"EmptyColumns").alias("description"))

Result:

+----------------------------------------------------+
|description                                         |
+----------------------------------------------------+
|id 0 has the [text] columns with empty values       |
|id 2 has the [text,Source] columns with empty values|
+----------------------------------------------------+

Upvotes: 3

Krzysztof Atłasik
Krzysztof Atłasik

Reputation: 22595

You could do something like this:

case class IsEmptyRow(id: Int, description: String) //case class for column names

val isEmptyDf = df.map { 
   row => row.getInt(row.fieldIndex("id")) -> row //we take id of row as first column
     .toSeq //then to get secod we change row values to seq
     .zip(df.columns) //zip it with column names
     .collect { //if value is string and empty we append column name
        case (value: String, column) if value.isEmpty => column
     }
}.map { //then we create description string and pack results to case class
   case (id, Nil)  => IsEmptyRow(id, s"id $id has no columns with empty values")
   case (id, List(column))  => IsEmptyRow(id, s"id $id has the $column column with empty values")
   case (id, columns) => IsEmptyRow(id, s"id $id has the ${columns.mkString(", ")} columns with empty values")
}

Then running isEmptyDf.show(truncate = false) will show:

+---+---------------------------------------------------+
|id |description                                        |
+---+---------------------------------------------------+
|0  |id 0 has the text columns with empty values        |
|1  |id 1 has no columns with empty values              |
|2  |id 2 has the text, Source columns with empty values|
+---+---------------------------------------------------+

You can also join back with original dataset:

df.join(isEmptyDf, "id").show(truncate = false)

Upvotes: 2

Related Questions