Reputation: 2705
I'm manually creating a dataframe for some testing. The code to create it is:
case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
.createDataFrame(List(input(1110,0,1001,-10.00),
input(1111,1,1001,10.00),
input(1111,0,1002,10.00)))
So the schema looks like this:
root
|-- id: long (nullable = false)
|-- var1: integer (nullable = false)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?
Upvotes: 48
Views: 86061
Reputation: 702
As I came here searching for a pyspark solution and did not find one, here it is:
from pyspark.sql.types import StructType, StructField
df = sqlContext.createDataFrame(
[(1, "a", 4), (3, "B", 5)], ("col1", "col2", "col3"))
df.show()
df.schema
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a| 4|
| 3| B| 5|
+----+----+----+
StructType(
List(
StructField(col1,LongType,true),
StructField(col2,StringType,true),
StructField(col3,LongType,true)
)
)
schema = StructType()
for field in df.schema.fields:
schema.add(StructField(field.name, field.dataType, False))
newdf = spark.createDataFrame(df.rdd, schema)
newdf.schema
StructType(
List(
StructField(col1,LongType,false),
StructField(col2,StringType,false),
StructField(col3,LongType,false)
)
)
Upvotes: 4
Reputation: 85
When you want to drop a column and create a new column in spark dataframe, you can create a nullable column like.
NOTE: The above code works if you want to create a column of type string and also make it nullable
NOTE: The above code works if you want to create a column of type integer and also make it nullable
Upvotes: 0
Reputation: 196
Thanks Martin Senne. Just a little addition. In case of inner struct types, you may need to set nullable recursively, like this:
def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
def set(st: StructType): StructType = {
StructType(st.map {
case StructField(name, dataType, _, metadata) =>
val newDataType = dataType match {
case t: StructType => set(t)
case _ => dataType
}
StructField(name, newDataType, nullable = nullable, metadata)
})
}
df.sqlContext.createDataFrame(df.rdd, set(df.schema))
}
Upvotes: 4
Reputation: 1859
Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
Spark will then think that this column may contain null
, and nullability will be set to true
.
Also, you can use udf
, to wrap your values in Option
.
Works fine even for streaming cases.
Upvotes: 43
Reputation: 29
Just use java.lang.Integer instead of scala.Int in your case class.
case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
Upvotes: 2
Reputation: 621
Instead of case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
one can use _.copy(nullable = nullable)
. Then the whole function can be written as:
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
Upvotes: 10
Reputation: 656
This is a late answer, but wanted to give an alternative solution for people that come here. You can automatically make a DataFrame
Column
nullable from the start by the following modification to your code:
case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
.createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
input(Some(1111),Some(1),1001,10.00),
input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema
This will yield:
root
|-- id: long (nullable = true)
|-- var1: integer (nullable = true)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]
Essentially, if you declare a field as an Option
by using Some([element])
or None
as the actual inputs, then that field be nullable. Otherwise, the field will not be nullable. I hope this helps!
Upvotes: 17
Reputation: 6059
With the imports
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
you can use
/**
* Set nullable property of column.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either nullable or not
*/
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
directly.
Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )
Use a slight modified version of setNullableStateOfColumn
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
Explicitely define the schema. (Use reflection to create a solution that is more general)
configuredUnitTest("Stackoverflow.") { sparkContext =>
case class Input(id:Long, var1:Int, var2:Int, var3:Double)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
// use this to set the schema explicitly or
// use refelection on the case class member to construct the schema
val schema = StructType( Seq (
StructField( "id", LongType, true),
StructField( "var1", IntegerType, true),
StructField( "var2", IntegerType, true),
StructField( "var3", DoubleType, true)
))
val is: List[Input] = List(
Input(1110, 0, 1001,-10.00),
Input(1111, 1, 1001, 10.00),
Input(1111, 0, 1002, 10.00)
)
val rdd: RDD[Input] = sparkContext.parallelize( is )
val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
val inputDF = sqlContext.createDataFrame( rowRDD, schema )
inputDF.printSchema
inputDF.show()
}
Upvotes: 54