Reputation: 873
I have json data in various json files And the keys could be different in lines, for eg
{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}
I want to aggreagate data on column 'b','c','d' and 'f' which is not present in the given json file but could be present in the other files. SO as column 'f' is not present we can take empty string for that column.
I am reading the input file and aggregating the data like this
import pyspark.sql.functions as f
df = spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))
This is the final output I want
{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}
Can anyone please Help? Thanks in advance!
Upvotes: 21
Views: 48792
Reputation: 36
here is a spark function which you can use in a df.transform(f)
:
def addMissingColumn(
colName: String,
defaultColumn: Column = lit(null).cast(StringType)
): DataFrame => DataFrame = { df =>
val noInfoPresent = !df.columns.toSeq.contains(colName)
val dfUpdated = if (noInfoPresent) {
df.withColumn(colName, defaultColumn)
} else { df }
dfUpdated
}
Upvotes: 0
Reputation: 13926
You can check if colum is available in dataframe and modify df
only if necessary:
if 'f' not in df.columns:
df = df.withColumn('f', f.lit(''))
For nested schemas you may need to use df.schema
like below:
>>> df.printSchema()
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False
Upvotes: 41
Reputation: 49
This function result for me.
def detect_data(column, df, data_type):
if not column in df.columns:
ret = lit(None).cast(data_type)
else:
ret = col(column).cast(data_type)
return ret
df = df.withColumn('f', detect_data('f', df, StringType()))
Upvotes: 2
Reputation: 5696
In case someone needs this in Scala:
if (!df.columns.contains("f")) {
val newDf = df.withColumn("f", lit(""))
}
Upvotes: 8