Reputation: 883
As shown in the below code, I am reading a JSON file into a dataframe and then selecting some fields from that dataframe into another one.
df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_basicInfo = df_record.select(col("key1").alias("ID"), \
col("key2").alias("Status"), \
col("key3.ResponseType").alias("ResponseType"), \
col("key3.someIndicator").alias("SomeIndicator") \
)
Issue is that some times, the JSON file does not have some of the keys that I try to fetch - like ResponseType
. So it ends up throwing errors like:
org.apache.spark.sql.AnalysisException: No such struct field ResponseType
How can I get around this issue without forcing a schema at the time of read? is it possible to make it return a NULL under that column when it is not available?
how do I detect if a spark dataframe has a column Does mention how to detect if a column is available in a dataframe. This question, however, is about how to use that function.
Upvotes: 14
Views: 38739
Reputation: 1
Another way to do this is casting your struct type column to string and using the get_json_object function. It can handle unknown fields. eg:
from pyspark.sql.functions import get_json_object
df_record.select(get_json_object(df_record.key3.cast("string"),'$. ResponseType').alias('ResponseType'))
The output will always be string. The string 'null' will be the column value where the required field is not found
Upvotes: 0
Reputation:
Using has_column
function define here by zero323 and general guidelines about adding empty columns either
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
if has_column(df_record, "key3.ResponseType"):
df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
# Adjust types according to your needs
df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string"))
Adjust types according to your requirements, and repeat process for the remaining columns.
Alternatively define a schema that covers all desired types:
schema = StructType([
StructField("key1", StringType()),
StructField("key2", StringType()),
StructField("key2", StructType([
StructField("ResponseType", StringType()),
StructField("someIndicator", StringType()),
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
(once again adjust the types), and use your current code.
Upvotes: 14
Reputation: 7406
I saw many confusing answers, so I hope this helps in Pyspark
, here is how you do it! Create a function to check on the columns and keep checking each column to see if it exists, if not replace it with None
or a relevant datatype value.
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import lit, col, when
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
Now, as mentioned in the question
df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_new = df_record
if has_column(df_new, "data.col1"):
df_new = df_new.withColumn("col_1", col("data.col1"))
else:
df_new = df_new.withColumn("col_1", lit(None).cast("string"))
if has_column(df_new, "log.id"):
df_new = df_loader.withColumn("log_id", col("log.id").cast("bigint"))
else:
df_new = df_new.withColumn("log_id", lit(None).cast("bigint"))
.....
and so on, you make relevant changes to the dataframe
till you finally see all the fields you want to populate in df_new
. Hope this helps !
Upvotes: 0
Reputation: 740
So I tried using the accepted answer, however I found that if the column key3.ResponseType
doesn't exist, it will fail.
You can do something like this -
def hasColumn(df: DataFrame, path: String) =
if (Try(df(path)).isSuccess == true) {
df(path)
}
else {
lit(null)
}
Here you evaluate in function if column exists, and if it doesn't it just returns a NULL column.
You can now use this like -
df_basicInfo = df_record.withColumn("ResponseType", hasColumn(df_record, "key3.ResponseType"))
Upvotes: 2
Reputation: 21
I had the same issue, i used a similar approach as Thomas. My user defined function code:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
var buffer:Row = root
if (buffer != null) {
if (buffer.schema.fieldNames.contains(fieldName)) {
buffer.getString(buffer.fieldIndex(fieldName))
} else {
null
}
}
else {
null
}
})
and then my Query:
%sql
SELECT
Id,
Created,
Payload.Type,
tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'
Upvotes: 2
Reputation: 22651
Spark is missing a simple function: struct_has(STRUCT, PATH)
or struct_get(STRUCT, PATH, DEFAULT)
where PATH
use dot notation.
So I wrote a very simple UDF:
From https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {
var fields = path.split("\\.")
var buffer:Row = root
val lastItem = fields.last
fields = fields.dropRight(1)
fields.foreach( (field:String) => {
if (buffer != null) {
if (buffer.schema.fieldNames.contains(field)) {
buffer = buffer.getStruct(buffer.fieldIndex(field))
} else {
buffer = null
}
}
})
if (buffer == null) {
defaultValue
} else {
buffer.getString(buffer.fieldIndex(lastItem))
}
})
This let you query like this:
SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA
Upvotes: 2