pallupz
pallupz

Reputation: 883

Spark: Return empty column if column does not exist in dataframe

As shown in the below code, I am reading a JSON file into a dataframe and then selecting some fields from that dataframe into another one.

df_record = spark.read.json("path/to/file.JSON",multiLine=True)

df_basicInfo = df_record.select(col("key1").alias("ID"), \
                                col("key2").alias("Status"), \
                                col("key3.ResponseType").alias("ResponseType"), \
                                col("key3.someIndicator").alias("SomeIndicator") \
                                )

Issue is that some times, the JSON file does not have some of the keys that I try to fetch - like ResponseType. So it ends up throwing errors like:

org.apache.spark.sql.AnalysisException: No such struct field ResponseType

How can I get around this issue without forcing a schema at the time of read? is it possible to make it return a NULL under that column when it is not available?

how do I detect if a spark dataframe has a column Does mention how to detect if a column is available in a dataframe. This question, however, is about how to use that function.

Upvotes: 14

Views: 38739

Answers (6)

Takreem
Takreem

Reputation: 1

Another way to do this is casting your struct type column to string and using the get_json_object function. It can handle unknown fields. eg:

from pyspark.sql.functions import get_json_object
df_record.select(get_json_object(df_record.key3.cast("string"),'$. ResponseType').alias('ResponseType'))

The output will always be string. The string 'null' will be the column value where the required field is not found

Upvotes: 0

user10456460
user10456460

Reputation:

Using has_column function define here by zero323 and general guidelines about adding empty columns either

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *

if has_column(df_record, "key3.ResponseType"):
    df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
    # Adjust types according to your needs
    df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string")) 

Adjust types according to your requirements, and repeat process for the remaining columns.

Alternatively define a schema that covers all desired types:

schema = StructType([
    StructField("key1", StringType()),
    StructField("key2", StringType()),
    StructField("key2", StructType([
        StructField("ResponseType", StringType()),
        StructField("someIndicator", StringType()),
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)

(once again adjust the types), and use your current code.

Upvotes: 14

Hari_pb
Hari_pb

Reputation: 7406

I saw many confusing answers, so I hope this helps in Pyspark, here is how you do it! Create a function to check on the columns and keep checking each column to see if it exists, if not replace it with None or a relevant datatype value.

from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import lit, col, when


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

Now, as mentioned in the question

df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_new = df_record

if has_column(df_new, "data.col1"):
    df_new = df_new.withColumn("col_1", col("data.col1"))
else:
    df_new = df_new.withColumn("col_1", lit(None).cast("string")) 


if has_column(df_new, "log.id"):
    df_new = df_loader.withColumn("log_id", col("log.id").cast("bigint"))
else:
    df_new = df_new.withColumn("log_id", lit(None).cast("bigint")) 

.....

and so on, you make relevant changes to the dataframe till you finally see all the fields you want to populate in df_new. Hope this helps !

Upvotes: 0

Hello.World
Hello.World

Reputation: 740

So I tried using the accepted answer, however I found that if the column key3.ResponseType doesn't exist, it will fail.

You can do something like this -

def hasColumn(df: DataFrame, path: String) = 
  if (Try(df(path)).isSuccess == true) {
      df(path)
  }
  else {
      lit(null) 
  }

Here you evaluate in function if column exists, and if it doesn't it just returns a NULL column.

You can now use this like -

df_basicInfo = df_record.withColumn("ResponseType", hasColumn(df_record, "key3.ResponseType"))

Upvotes: 2

Roland Ebner
Roland Ebner

Reputation: 21

I had the same issue, i used a similar approach as Thomas. My user defined function code:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
    var buffer:Row = root

    if (buffer != null) {
      if (buffer.schema.fieldNames.contains(fieldName)) {
         buffer.getString(buffer.fieldIndex(fieldName))
      } else {
        null
      }
    }
    else {
      null
    }
})

and then my Query:

%sql

SELECT
  Id,
  Created,
  Payload.Type,
  tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'

Upvotes: 2

Thomas Decaux
Thomas Decaux

Reputation: 22651

Spark is missing a simple function: struct_has(STRUCT, PATH) or struct_get(STRUCT, PATH, DEFAULT) where PATHuse dot notation.

So I wrote a very simple UDF:

From https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {

    var fields = path.split("\\.")
    var buffer:Row = root
    val lastItem = fields.last

    fields = fields.dropRight(1)

    fields.foreach( (field:String) => {
        if (buffer != null) {
            if (buffer.schema.fieldNames.contains(field)) {
                buffer = buffer.getStruct(buffer.fieldIndex(field))
            } else {
                buffer = null
            }
        }
    })

    if (buffer == null) {
        defaultValue
    } else {
        buffer.getString(buffer.fieldIndex(lastItem))
    }
})

This let you query like this:

SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA

Upvotes: 2

Related Questions