Anakata
Anakata

Reputation: 17

Filtering records to check if a certain column exists giving java.lang.NullPointerException

So I have a dataframe of records of this format-

{
    "table": "SYSMAN.EM_METRIC_COLUMN_VER_E",
    "op_type": "I",
    "op_ts": "2021-03-24 13:15:31.396105",
    "pos": "00000000000000000000",
    "after": {
        "METRIC_GROUP_ID": 4700,
        "METRIC_COLUMN_ID": 293339,
        "METRIC_GROUP_VERSION_ID": 41670
    }
}

And I want to filter these records on the basis of existence of a certain column. I want add it to a list if it has that column in the "after" struct (like METRIC_GROUP_ID, METRIC_COLUMN_ID, METRIC_GROUP_VERSION_ID).

This is the code that I have written-

def HasColumn(row: Row, Column:String) = 
Try(row.getAs[Row]("before").getAs[Any](Column)).isSuccess || Try(row.getAs[Row]("after").getAs[Any](Column)).isSuccess

var records_list: List[Row] = null   

for(row<-inputDS){if(HasColumn(row,Column_String)){records_list:+row}}

I am getting the following exception from the last line-

21/06/02 21:54:02 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 12)

java.lang.NullPointerException

I am aware that you cannot access any of Spark's "driver-side" abstractions (RDDs, DataFrames, Datasets, SparkSession...) from within a function passed on to one of Spark's DataFrame/RDD transformations because they only exist on your Driver application. So I tried to avoid it as much as possible but I'm not getting any solution.

Upvotes: 0

Views: 679

Answers (2)

kanielc
kanielc

Reputation: 1322

I can't tell if you have to do this as an RDD or as a Dataset, if RDD, the solution would look like this as a function

def filterData(data: RDD[Row], column: String): RDD[Row] = {
    data.filter { r =>
      Try(r.getAs[Row]("before").getAs[Any](column))
        .orElse(Try(r.getAs[Row]("after").getAs[Any](column)))
        .isSuccess
    }
  }

If you want to reduce the amount of code there, we can do

def filterData(data: RDD[Row], column: String): RDD[Row] = {
  data.filter { r =>
    Seq("before", "after").map(c => Try(r.getAs[Row](c)).map(_.getAs[Any](column)))
      .reduce(_ orElse _).isSuccess
    }
  }

That has the advantage that if you want to add more places to search other than just before and after, you just have to add it to the Seq

With a Dataset you'd only need to check if the column exists and is non-null

df.where(col(column).isNotNull)

Realistically both assume you have a fixed schema (even if inferred), so the Dataset one is much simpler.

Upvotes: 0

s.polam
s.polam

Reputation: 10382

Try below code.

Create UDF

scala> def hasColumn = udf((row:Row,column:String) => Try(row.getAs[Row]("before").getAs[Any](column)).isSuccess || Try(row.getAs[Row]("after").getAs[Any](column)).isSuccess)

Use UDF to check if the column is available or not.

scala> df.withColumn("has",hasColumn(struct($"*"),lit("METRIC_COLUMN_ID"))).show(false)
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|after              |op_ts                     |op_type|pos                 |table                        |has |
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|[293339,4700,41670]|2021-03-24 13:15:31.396105|I      |00000000000000000000|SYSMAN.EM_METRIC_COLUMN_VER_E|true|
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+

And add filter condition on new column.

scala> df.withColumn("has",hasColumn(struct($"*"),lit("METRIC_COLUMN_ID"))).filter($"has" === true).show(false)
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|after              |op_ts                     |op_type|pos                 |table                        |has |
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
|[293339,4700,41670]|2021-03-24 13:15:31.396105|I      |00000000000000000000|SYSMAN.EM_METRIC_COLUMN_VER_E|true|
+-------------------+--------------------------+-------+--------------------+-----------------------------+----+
scala> df.withColumn("has",hasColumn(struct($"*"),lit("Column_does_not_exist"))).filter($"has" === true).show(false)
+-----+-----+-------+---+-----+---+
|after|op_ts|op_type|pos|table|has|
+-----+-----+-------+---+-----+---+
+-----+-----+-------+---+-----+---+

Upvotes: 1

Related Questions