Reputation: 4991
Assuming I am having the following rdd:
val aSeq = Seq(("a",Seq(("aa",1.0),("bb",2.0),("cc",3.0))),
("b",Seq(("aa",3.0),("bb",4.0),("cc",5.0))),
("c",Seq(("aa",6.0),("bb",7.0),("cc",8.0))),
("d",Seq(("aa",9.0),("bb",10.0),("cc",11.0))))
val anRdd = sc.parallelize(aSeq)
How can I create a dataframe which uses the first values from the Sequence to name and stracture the schema? If I transform it to df I got the following:
val aDF = anRDD.toDF("id","column2")
aDF.printSchema
root
|---id: string
|---column2: array
|---- element: struct
|-----_1: string
|-----_2: double
To be more clear what I want is something like the following:
root
|--id: String(nullable = true)
|--column2:struct (nullable = true)
|----aa: Double
|----bb: Double
|----cc: Double
Edit
@eliasah gave a pretty understandable answer which give the desired output. I tried to implimented in a my real example, which is more 'deep'/nested. To illustrate I give the following example of an one level more that the first example:
val aSeq = Seq(("a",Seq(("aa",(("aaa",1.0),("bbb",Array(2.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,4.0)))),("cc",(("aaa",4.0),("bbb",Array(9.0,3.0)))))),
("b",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
("c",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
("d",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))))
val anRddB = sc.parallelize(aSeqB)
How can I have a DF with the following schema:
root
|--id: String
|--column2:struct
|----aa:struct
|--aaa:Double
|--bbb:array
|--element: double
|----bb:struct
|--aaa:Double
|--bbb:array
|--element: double
|----cc:struct
|--aaa:Double
|--bbb:array
|--element: double
How can this be done?
Upvotes: 2
Views: 200
Reputation: 40360
If I understand your question correctly, the solution isn't pretty but here it is. You'll need to import the struct
function :
scala> import org.apache.spark.sql.functions.struct
// import org.apache.spark.sql.functions.struct
scala> val seq = Seq(("a",Seq(("aa",(("aaa",1.0),("bbb",Array(2.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,4.0)))),("cc",(("aaa",4.0),("bbb",Array(9.0,3.0)))))),
("b",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
("c",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))),
("d",Seq(("aa",(("aaa",1.0),("bbb",Array(3.0,2.0)))),("bb",(("aaa",8.0),("bbb",Array(3.0,3.0)))),("cc",(("aaa",4.0),("bbb",Array(3.0,9.0)))))))
scala> val anRdd = sc.parallelize(seq)
Convert your column2
to a Map :
scala> val df = anRDD.map(x => (x._1, x._2.toMap)).toDF("x", "y")
// df: org.apache.spark.sql.DataFrame = [x: string, y: map<string,double>]
Pull up the first set of fields :
scala> val df2 = df.select($"x".as("id"), struct($"y".getItem("aa").as("aa"),$"y".getItem("bb").as("bb"),$"y".getItem("cc").as("cc")).as("column2"))
// df2: org.apache.spark.sql.DataFrame = [id: string, column2: struct<aa:struct<_1:struct<_1:string,_2:double>,_2:struct<_1:string,_2:array<double>>>,bb:struct<_1:struct<_1:string,_2:double>,_2:struct<_1:string,_2:array<double>>>,cc:struct<_1:struct<_1:string,_2:double>,_2:struct<_1:string,_2:array<double>>>>]
scala> df2.printSchema
// root
// |-- id: string (nullable = true)
// |-- column2: struct (nullable = false)
// | |-- aa: struct (nullable = true)
// | | |-- _1: struct (nullable = true)
// | | | |-- _1: string (nullable = true)
// | | | |-- _2: double (nullable = false)
// | | |-- _2: struct (nullable = true)
// | | | |-- _1: string (nullable = true)
// | | | |-- _2: array (nullable = true)
// | | | | |-- element: double (containsNull = false)
// | |-- bb: struct (nullable = true)
// | | |-- _1: struct (nullable = true)
// | | | |-- _1: string (nullable = true)
// | | | |-- _2: double (nullable = false)
// | | |-- _2: struct (nullable = true)
scala> df2.show(false)
// +---+----------------------------------------------------------------------------------------------------------------------------+
// |id |column2 |
// +---+----------------------------------------------------------------------------------------------------------------------------+
// |a |[[[aaa,1.0],[bbb,WrappedArray(2.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 4.0)]],[[aaa,4.0],[bbb,WrappedArray(9.0, 3.0)]]]|
// |b |[[[aaa,1.0],[bbb,WrappedArray(3.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 3.0)]],[[aaa,4.0],[bbb,WrappedArray(3.0, 9.0)]]]|
// |c |[[[aaa,1.0],[bbb,WrappedArray(3.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 3.0)]],[[aaa,4.0],[bbb,WrappedArray(3.0, 9.0)]]]|
// |d |[[[aaa,1.0],[bbb,WrappedArray(3.0, 2.0)]],[[aaa,8.0],[bbb,WrappedArray(3.0, 3.0)]],[[aaa,4.0],[bbb,WrappedArray(3.0, 9.0)]]]|
// +---+----------------------------------------------------------------------------------------------------------------------------+
Update: To follow up the question update, I'll use the DataFrame df2
to continue pull up the nested fields. It is a bit tricky but here it goes :
val df3 = df2.select(
$"id",
struct(
struct($"column2.aa._1".getItem("_2").as("aaa"),$"column2.aa._2".getItem("_2").as("bbb")).as("aa"),
struct($"column2.bb._1".getItem("_2").as("aaa"),$"column2.bb._2".getItem("_2").as("bbb")).as("bb"),
struct($"column2.cc._1".getItem("_2").as("aaa"),$"column2.cc._2".getItem("_2").as("ccc")).as("cc")
).as("column2")
)
// df3: org.apache.spark.sql.DataFrame = [id: string, column2: struct<aa:struct<aaa:double,bbb:array<double>>,bb:struct<aaa:double,bbb:array<double>>,cc:struct<aaa:double,ccc:array<double>>>]
There is no magic here, you need to understand well the gymnastics of struct
types and nested types to be able to combine it a get the expected output :
df3.printSchema
// root
// |-- id: string (nullable = true)
// |-- column2: struct (nullable = false)
// | |-- aa: struct (nullable = false)
// | | |-- aaa: double (nullable = true)
// | | |-- bbb: array (nullable = true)
// | | | |-- element: double (containsNull = false)
// | |-- bb: struct (nullable = false)
// | | |-- aaa: double (nullable = true)
// | | |-- bbb: array (nullable = true)
// | | | |-- element: double (containsNull = false)
// | |-- cc: struct (nullable = false)
// | | |-- aaa: double (nullable = true)
// | | |-- ccc: array (nullable = true)
// | | | |-- element: double (containsNull = false)
Note: Tested with spark-shell 2.0
Upvotes: 2