J  Calbreath
J Calbreath

Reputation: 2705

Change nullable property of column in spark dataframe

I'm manually creating a dataframe for some testing. The code to create it is:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

So the schema looks like this:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?

Upvotes: 48

Views: 86061

Answers (8)

jugi
jugi

Reputation: 702

As I came here searching for a pyspark solution and did not find one, here it is:

from pyspark.sql.types import StructType, StructField

df = sqlContext.createDataFrame(
[(1, "a", 4), (3, "B", 5)], ("col1", "col2", "col3"))

df.show()
df.schema

+----+----+----+  
|col1|col2|col3|  
+----+----+----+  
|   1|   a|   4|  
|   3|   B|   5|  
+----+----+----+  

StructType(
    List(
        StructField(col1,LongType,true),
        StructField(col2,StringType,true),
        StructField(col3,LongType,true)
    )
)

schema = StructType()
for field in df.schema.fields:
    schema.add(StructField(field.name, field.dataType, False))
newdf = spark.createDataFrame(df.rdd, schema)

newdf.schema
StructType(
    List(
        StructField(col1,LongType,false),
        StructField(col2,StringType,false),
        StructField(col3,LongType,false)
    )
)

Upvotes: 4

Hemanth Vatti
Hemanth Vatti

Reputation: 85

When you want to drop a column and create a new column in spark dataframe, you can create a nullable column like.

  1. df.withColumn("Employee_Name", when(lit('') == '', '').otherwise(lit(None)))

NOTE: The above code works if you want to create a column of type string and also make it nullable

  1. df.withColumn("Employee_Name", when(lit('') == '', 0).otherwise(lit(None)))

NOTE: The above code works if you want to create a column of type integer and also make it nullable

Upvotes: 0

skotlov
skotlov

Reputation: 196

Thanks Martin Senne. Just a little addition. In case of inner struct types, you may need to set nullable recursively, like this:

def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
    def set(st: StructType): StructType = {
      StructType(st.map {
        case StructField(name, dataType, _, metadata) =>
          val newDataType = dataType match {
            case t: StructType => set(t)
            case _ => dataType
          }
          StructField(name, newDataType, nullable = nullable, metadata)
      })
    }

    df.sqlContext.createDataFrame(df.rdd, set(df.schema))
  }

Upvotes: 4

Rayan Ral
Rayan Ral

Reputation: 1859

Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))

Spark will then think that this column may contain null, and nullability will be set to true. Also, you can use udf, to wrap your values in Option. Works fine even for streaming cases.

Upvotes: 43

echo
echo

Reputation: 29

Just use java.lang.Integer instead of scala.Int in your case class.

case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)

Upvotes: 2

matemaciek
matemaciek

Reputation: 621

More compact version of setting all columns nullable parameter

Instead of case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) one can use _.copy(nullable = nullable). Then the whole function can be written as:

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}

Upvotes: 10

Sidd Singal
Sidd Singal

Reputation: 656

This is a late answer, but wanted to give an alternative solution for people that come here. You can automatically make a DataFrame Column nullable from the start by the following modification to your code:

case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
  .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
    input(Some(1111),Some(1),1001,10.00),
    input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema

This will yield:

root
 |-- id: long (nullable = true)
 |-- var1: integer (nullable = true)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]

Essentially, if you declare a field as an Option by using Some([element]) or None as the actual inputs, then that field be nullable. Otherwise, the field will not be nullable. I hope this helps!

Upvotes: 17

Martin Senne
Martin Senne

Reputation: 6059

Answer

With the imports

import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

you can use

/**
 * Set nullable property of column.
 * @param df source DataFrame
 * @param cn is the column name to change
 * @param nullable is the flag to set, such that the column is  either nullable or not
 */
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {

  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
    case y: StructField => y
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

directly.

Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call

val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )

Edit

Alternative solution 1

Use a slight modified version of setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

Alternative solution 2

Explicitely define the schema. (Use reflection to create a solution that is more general)

configuredUnitTest("Stackoverflow.") { sparkContext =>

  case class Input(id:Long, var1:Int, var2:Int, var3:Double)

  val sqlContext = new SQLContext(sparkContext)
  import sqlContext.implicits._


  // use this to set the schema explicitly or
  // use refelection on the case class member to construct the schema
  val schema = StructType( Seq (
    StructField( "id", LongType, true),
    StructField( "var1", IntegerType, true),
    StructField( "var2", IntegerType, true),
    StructField( "var3", DoubleType, true)
  ))

  val is: List[Input] = List(
    Input(1110, 0, 1001,-10.00),
    Input(1111, 1, 1001, 10.00),
    Input(1111, 0, 1002, 10.00)
  )

  val rdd: RDD[Input] =  sparkContext.parallelize( is )
  val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
  val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 

  inputDF.printSchema
  inputDF.show()
}

Upvotes: 54

Related Questions