Georg Heiler
Georg Heiler

Reputation: 17676

spark concatenate data frames and merge schema

I have several data frames in spark with partly similar schema (header) int the beginning and different columns (custom) in the end.

case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)

val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS

This could look like:

+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
|      A|    Ba1|      1|custom1|
|      A|    Ba2|      2|custom2|
+-------+-------+-------+-------+


+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
|      B|    Bb1|      1|custom12| custom5|
|      B|    Bb2|     22|custom12|custom55|
+-------+-------+-------+--------+--------+


+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
|      A|    Bc1|      1|custom2|     22|   44.4|
+-------+-------+-------+-------+-------+-------+

How can I merge the schema to basically concatenate all the dataframes into a single schema

case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
                custom4: Option[Double], custom5:Option[String], type:String)

where some columns which are not present will be nullable?

Output should should look like this in case of the first record from data frame named first

+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
|      A|      B|      1|custom1|Nan    |Nan    |    Nan|  Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+

I was thinking about joining the data frames via the header columns, however,only some (lets say header1) would hold the same (actually joinable) values and the others (header2,3) would hold different values i.e.

first
    .join(second, Seq("header1", "header2", "header3"), "LEFT")
    .join(third, Seq("header1", "header2", "header3"), "LEFT")
  .show

resulting in

+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|      A|    Ba1|      1|custom1|   null|   null|   null|   null|   null|
|      A|    Ba2|      2|custom2|   null|   null|   null|   null|   null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+

is not correct as I just want to pd.Concat(axis=0) the dataFrames i.e. am lacking most of the records. Also it would be lacking a type column identifying the original data frame i.e. first, second, third

edit

I think a classical full outer join is the solution

first
    .join(second, Seq("header1", "header2", "header3"), "fullouter")
    .join(third, Seq("header1", "header2", "header3"), "fullouter")
  .show

yields:

+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|      A|    Ba1|      1|custom1|    null|    null|   null|   null|   null|
|      A|    Ba2|      2|custom2|    null|    null|   null|   null|   null|
|      A|    Bb1|      1|   null|custom12| custom5|   null|   null|   null|
|      A|    Bb2|     22|   null|custom12|custom55|   null|   null|   null|
|      A|    Bc1|      1|   null|    null|    null|custom2|     22|   44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+

As you see, actually there will never be a real join, rows are concatenated. Is there a simpler operation to achieve the same functionality? This answer is not optimal, as custom1 is a duplicate name. I rather would want to see a single custom1 column (with no null values if there is a second one to fill).

Upvotes: 3

Views: 5373

Answers (3)

user3360254
user3360254

Reputation: 1

If you are writing files to HDFS then you can achieve this by setting following property Spark.sql.parquet.mergeSchema to TRUE and write files to HDFS location.

It automatically updates the schema and returns all columns.

You can achieve this using below ways

  1. withColumn and union
  2. Specify schema before itself and perform union
spark.conf.set("spark.sql.parquet.mergeSchema","true")
eb = spark.read.format("csv").schema(schem).option("path","/retail/ebay.csv").load()
eb.printSchema()
eb.write.format("parquet").mode("append").save("/retail/parquet_test")

from pyspark.sql.functions import lit

eb1 = eb.withColumn("dummy",lit(35))
eb1.printSchema()


eb1.write.format("parquet").mode("append").save("/retail/parquet_test")

eb2 = spark.read.parquet("/srinchin/parquet_test")
eb2.printSchema()

Upvotes: 0

Robert Fidler
Robert Fidler

Reputation: 21

Please test the SQL Union approach if it provides the desired result.

SELECT header1,
       header2,
       header3,
       custom1,
       To_char(NULL)   "custom2",
       To_char(NULL)   "custom3",
       To_number(NULL) "custom4",
       To_char(NULL)   "custom5"
FROM   table1
UNION
SELECT header1,
       header2,
       header3,
       custom1,
       To_char(NULL)   "custom2",
       To_char(NULL)   "custom3",
       To_number(NULL) "custom4",
       custom5
FROM   table2
UNION
SELECT header1,
       header2,
       header3,
       To_char(NULL) "custom1",
       custom2,
       custom3,
       custom4,
       To_char(NULL) "custom5"
FROM   table3;

Upvotes: 2

prudenko
prudenko

Reputation: 1701

Check out my comment to similar question. Basically you need to union all the frames. To make similar schema you need to use dataframe.withColumn(ColumnName, expr("null")) expression:

import org.apache.spark.sql.functions._  
val first1 = first.withColumn("custom5", expr("null"))
                  .withColumn("custom4", expr("null"))
val second2 = second.withColumn("custom4", expr("null"))
val result = first1.unionAll(second2).unionAll(third)

Upvotes: 3

Related Questions