Reputation: 135
guys, I have this user-defined function to check if the text rows are empty:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
{{{
val df = Seq(
(0, "","Mongo"),
(1, "World","sql"),
(2, "","")
).toDF("id", "text", "Source")
// Define a "regular" Scala function
val checkEmpty: String => Boolean = x => {
var test = false
if(x.isEmpty){
test = true
}
test
}
val upper = udf(checkEmpty)
df.withColumn("isEmpty", upper('text)).show
}}}
I'm actually getting this dataframe:
+---+-----+------+-------+
| id| text|Source|isEmpty|
+---+-----+------+-------+
| 0| | Mongo| true|
| 1|World| sql| false|
| 2| | | true|
+---+-----+------+-------+
How could I check for all the rows for empty values and return a message like:
id 0 has the text column with empty values
id 2 has the text,source column with empty values
Upvotes: 2
Views: 4455
Reputation: 7207
UDF which get nullable columns as Row can be used, for get empty column names. Then rows with non-empty columns can be filtered:
val emptyColumnList = (r: Row) => r
.toSeq
.zipWithIndex
.filter(_._1.toString().isEmpty)
.map(pair => r.schema.fields(pair._2).name)
val emptyColumnListUDF = udf(emptyColumnList)
val columnsToCheck = Seq($"text", $"Source")
val result = df
.withColumn("EmptyColumns", emptyColumnListUDF(struct(columnsToCheck: _*)))
.where(size($"EmptyColumns") > 0)
.select(format_string("id %s has the %s columns with empty values", $"id", $"EmptyColumns").alias("description"))
Result:
+----------------------------------------------------+
|description |
+----------------------------------------------------+
|id 0 has the [text] columns with empty values |
|id 2 has the [text,Source] columns with empty values|
+----------------------------------------------------+
Upvotes: 3
Reputation: 22595
You could do something like this:
case class IsEmptyRow(id: Int, description: String) //case class for column names
val isEmptyDf = df.map {
row => row.getInt(row.fieldIndex("id")) -> row //we take id of row as first column
.toSeq //then to get secod we change row values to seq
.zip(df.columns) //zip it with column names
.collect { //if value is string and empty we append column name
case (value: String, column) if value.isEmpty => column
}
}.map { //then we create description string and pack results to case class
case (id, Nil) => IsEmptyRow(id, s"id $id has no columns with empty values")
case (id, List(column)) => IsEmptyRow(id, s"id $id has the $column column with empty values")
case (id, columns) => IsEmptyRow(id, s"id $id has the ${columns.mkString(", ")} columns with empty values")
}
Then running isEmptyDf.show(truncate = false)
will show:
+---+---------------------------------------------------+
|id |description |
+---+---------------------------------------------------+
|0 |id 0 has the text columns with empty values |
|1 |id 1 has no columns with empty values |
|2 |id 2 has the text, Source columns with empty values|
+---+---------------------------------------------------+
You can also join back with original dataset
:
df.join(isEmptyDf, "id").show(truncate = false)
Upvotes: 2