gashu
gashu

Reputation: 873

pyspark dataframe add a column if it doesn't exist

I have json data in various json files And the keys could be different in lines, for eg

{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}

I want to aggreagate data on column 'b','c','d' and 'f' which is not present in the given json file but could be present in the other files. SO as column 'f' is not present we can take empty string for that column.

I am reading the input file and aggregating the data like this

import pyspark.sql.functions as f
df =  spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))

This is the final output I want

{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}

Can anyone please Help? Thanks in advance!

Upvotes: 21

Views: 48792

Answers (4)

Max
Max

Reputation: 36

here is a spark function which you can use in a df.transform(f):

def addMissingColumn(
      colName: String,
      defaultColumn: Column = lit(null).cast(StringType)
    ): DataFrame => DataFrame = { df =>
      val noInfoPresent = !df.columns.toSeq.contains(colName)
      val dfUpdated = if (noInfoPresent) {
        df.withColumn(colName, defaultColumn)
      } else { df }

      dfUpdated

    }

Upvotes: 0

Mariusz
Mariusz

Reputation: 13926

You can check if colum is available in dataframe and modify df only if necessary:

if 'f' not in df.columns:
   df = df.withColumn('f', f.lit(''))

For nested schemas you may need to use df.schema like below:

>>> df.printSchema()
root
 |-- a: struct (nullable = true)
 |    |-- b: long (nullable = true)

>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False

Upvotes: 41

Joaquin
Joaquin

Reputation: 49

This function result for me.

    def detect_data(column, df, data_type):
          if not column in df.columns:
            ret = lit(None).cast(data_type)
          else:
            ret = col(column).cast(data_type)
            
          return ret

    df = df.withColumn('f', detect_data('f', df, StringType()))

Upvotes: 2

Javier Montón
Javier Montón

Reputation: 5696

In case someone needs this in Scala:

if (!df.columns.contains("f")) {
  val newDf = df.withColumn("f", lit(""))
}

Upvotes: 8

Related Questions