Reputation: 17676
I have several data frames in spark with partly similar schema (header) int the beginning and different columns (custom) in the end.
case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)
val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS
This could look like:
+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
| A| Ba1| 1|custom1|
| A| Ba2| 2|custom2|
+-------+-------+-------+-------+
+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
| B| Bb1| 1|custom12| custom5|
| B| Bb2| 22|custom12|custom55|
+-------+-------+-------+--------+--------+
+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
| A| Bc1| 1|custom2| 22| 44.4|
+-------+-------+-------+-------+-------+-------+
How can I merge the schema to basically concatenate all the dataframes into a single schema
case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
custom4: Option[Double], custom5:Option[String], type:String)
where some columns which are not present will be nullable?
Output should should look like this in case of the first record from data frame named first
+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
| A| B| 1|custom1|Nan |Nan | Nan| Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+
I was thinking about joining the data frames via the header columns, however,only some (lets say header1) would hold the same (actually joinable) values and the others (header2,3) would hold different values i.e.
first
.join(second, Seq("header1", "header2", "header3"), "LEFT")
.join(third, Seq("header1", "header2", "header3"), "LEFT")
.show
resulting in
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
is not correct as I just want to pd.Concat(axis=0)
the dataFrames i.e. am lacking most of the records.
Also it would be lacking a type
column identifying the original data frame i.e. first, second, third
I think a classical full outer join is the solution
first
.join(second, Seq("header1", "header2", "header3"), "fullouter")
.join(third, Seq("header1", "header2", "header3"), "fullouter")
.show
yields:
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
| A| Bb1| 1| null|custom12| custom5| null| null| null|
| A| Bb2| 22| null|custom12|custom55| null| null| null|
| A| Bc1| 1| null| null| null|custom2| 22| 44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
As you see, actually there will never be a real join, rows are concatenated. Is there a simpler operation to achieve the same functionality?
This answer is not optimal, as custom1
is a duplicate name. I rather would want to see a single custom1
column (with no null values if there is a second one to fill).
Upvotes: 3
Views: 5373
Reputation: 1
If you are writing files to HDFS then you can achieve this by setting following property Spark.sql.parquet.mergeSchema
to TRUE and write files to HDFS location.
It automatically updates the schema and returns all columns.
You can achieve this using below ways
spark.conf.set("spark.sql.parquet.mergeSchema","true")
eb = spark.read.format("csv").schema(schem).option("path","/retail/ebay.csv").load()
eb.printSchema()
eb.write.format("parquet").mode("append").save("/retail/parquet_test")
from pyspark.sql.functions import lit
eb1 = eb.withColumn("dummy",lit(35))
eb1.printSchema()
eb1.write.format("parquet").mode("append").save("/retail/parquet_test")
eb2 = spark.read.parquet("/srinchin/parquet_test")
eb2.printSchema()
Upvotes: 0
Reputation: 21
Please test the SQL Union approach if it provides the desired result.
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
To_char(NULL) "custom5"
FROM table1
UNION
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
custom5
FROM table2
UNION
SELECT header1,
header2,
header3,
To_char(NULL) "custom1",
custom2,
custom3,
custom4,
To_char(NULL) "custom5"
FROM table3;
Upvotes: 2
Reputation: 1701
Check out my comment to similar question. Basically you need to union all the frames. To make similar schema you need to use dataframe.withColumn(ColumnName, expr("null"))
expression:
import org.apache.spark.sql.functions._
val first1 = first.withColumn("custom5", expr("null"))
.withColumn("custom4", expr("null"))
val second2 = second.withColumn("custom4", expr("null"))
val result = first1.unionAll(second2).unionAll(third)
Upvotes: 3