Reputation: 2738
I have 3 dataframes currently Call them dfA, dfB, and dfC
dfA has 3 cols
dfB has say 5 cols. the 2nd col, is a FK reference back to dFA record.
Similarily dfC has 3 cols, also with a reference back to dfA
Using Spark SQL i can do an JOIN across the 3
%sql
SELECT * FROM dfA
INNER JOIN dfB ON dfA.Id = dfB.AId
INNER JOIN dfC ON dfA.Id = dfC.AId
And I'll get my resultset, but it's been "flattened" as SQL would do with tabular results like this.
I want to load it in to a complex schema like this
val destinationSchema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("age", StringType)
.add("b",
new StructType()
.add("street", DoubleType, true)
.add("city", StringType, true)
.add("zip", StringType, true)
)
.add("c",
new StructType()
.add("somefield", StringType, true)
)
Any ideas how to take the results of the SELECT and save to dataframe with specifying the schema?
I ultimately want to save the complex StructType, or JSON, and load this is to Mongo DB using the Mongo Spark Connector.
Or, is there a better way to accomplish this from the 3 seperate dataframes (which were originally 3 seperate CSV files that were read in)?
Upvotes: 1
Views: 639
Reputation: 21
the previous one worked if all the records had a 1:1 relationship.
here is how you can achieve it for 1:M (hint: use collect_set to group rows)
import org.apache.spark.sql.functions._
val destDF = atableDF
.join(btableDF, atableDF("id") === btableDF("id")).drop(btableDF("id"))
.join(ctableDF, atableDF("id") === ctableDF("id")).drop(ctableDF("id"))
.groupBy($"id",$"name",$"age")
.agg(collect_set(struct($"street",$"city",$"zip")) as "b",collect_set(struct($"somefield")) as "c")
val jsonDestDF = destDF.select(to_json(struct($"*")).as("row"))
display(jsonDestDF)
which will give you the following output:
row
"{""id"":102,""name"":""Damian"",""age"":""23"",""b"":[{""street"":""Short Street"",""city"":""New York"",""zip"":""70701""}],""c"":[{""somefield"":""pears""},{""somefield"":""pineapples""}]}"
"{""id"":100,""name"":""John"",""age"":""43"",""b"":[{""street"":""Dark Road"",""city"":""Washington"",""zip"":""98002""}],""c"":[{""somefield"":""appples""}]}"
"{""id"":101,""name"":""Sally"",""age"":""34"",""b"":[{""street"":""Light Ave"",""city"":""Los Angeles"",""zip"":""90210""}],""c"":[{""somefield"":""grapes""},{""somefield"":""peaches""},{""somefield"":""bananas""}]}"
sample data I used just in case anyone wants to play:
atable.csv
100,"John",43
101,"Sally",34
102,"Damian",23
104,"Rita",14
105,"Mohit",23
btable.csv:
100,"Dark Road","Washington",98002
101,"Light Ave","Los Angeles",90210
102,"Short Street","New York",70701
104,"Long Drive","Buffalo",80345
105,"Circular Quay","Orlando",65403
ctable.csv:
100,"appples"
101,"bananas"
102,"pears"
101,"grapes"
102,"pineapples"
101,"peaches"
Upvotes: 1
Reputation: 21
given three dataframes, loaded in from csv files, you can do this:
import org.apache.spark.sql.functions._
val destDF = atableDF
.join(btableDF, atableDF("id") === btableDF("id")).drop(btableDF("id"))
.join(ctableDF, atableDF("id") === ctableDF("id")).drop(ctableDF("id"))
.select($"id",$"name",$"age",struct($"street",$"city",$"zip") as "b",struct($"somefield") as "c")
val jsonDestDF = destDF.select(to_json(struct($"*")).as("row"))
which will output:
row
"{""id"":100,""name"":""John"",""age"":""43"",""b"":{""street"":""Dark Road"",""city"":""Washington"",""zip"":""98002""},""c"":{""somefield"":""appples""}}"
"{""id"":101,""name"":""Sally"",""age"":""34"",""b"":{""street"":""Light Ave"",""city"":""Los Angeles"",""zip"":""90210""},""c"":{""somefield"":""bananas""}}"
"{""id"":102,""name"":""Damian"",""age"":""23"",""b"":{""street"":""Short Street"",""city"":""New York"",""zip"":""70701""},""c"":{""somefield"":""pears""}}"
Upvotes: 1