Reputation: 17111
The column names in this example from spark-sql come from the case class Person
case class Person(name: String, age: Int)
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
However in many cases the parameter names may be changed. This would cause columns to not be found if the file has not been updated to reflect the change.
How can I specify an appropriate mapping?
I am thinking something like:
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
val ps: Seq[Person] = ???
val personRDD = sc.parallelize(ps)
// Apply the schema to the RDD.
val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
Upvotes: 10
Views: 19487
Reputation: 6059
Basically, all the mapping you need to do can be achieved with
. (Here, I assume, that no type conversions need to be done.)
Given the forward- and backward-mapping as maps, the essential part is
val mapping ={ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe
val mappedDF = mapping: _* )
where mapping is an array of Column
s with alias.
object Example {
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
case class Person(name: String, age: Int)
object Mapping {
val from = Map("name" -> "a", "age" -> "b")
val to = Map("a" -> "name", "b" -> "age")
def main(args: Array[String]) : Unit = {
// init
val conf = new SparkConf()
.setAppName( "Example." )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// create persons
val persons = Seq(Person("bob", 35), Person("alice", 27))
val personsRDD = sc.parallelize(persons, 4)
val personsDF = personsRDD.toDF
writeParquet( personsDF, "persons.parquet", sc, sqlContext)
val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
import Mapping.from
val mapping ={ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
val mappedDF = mapping: _* )
mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
val df = // this df has columns a and b
val mapping ={ (x:(String, String)) => df(x._1).as(x._2) }.toArray mapping: _* )
If you need to convert a dataframe back to an RDD[Person], then
val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = { r: Row =>
Person( r.getAs("person"), r.getAs("age") )
Have also a look at How to convert spark SchemaRDD into RDD of my case class?
Upvotes: 8