Reputation: 2638
I would like to merge 2 dataframes with (potentially) mismatching schemas
org.apache.spark.sql.DataFrame = [name: string, age: int, height: int]
org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> A.unionAll(B)
would result in :
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 2 columns and the right has 3;
I would like to do this from within Spark.
However, the Spark docs only propose to write the whole 2 dataframes out to a directory and read them back in using spark.read.option("mergeSchema", "true")
.
So a union doesn't help me out, and neither does the documentation. I would like to keep this extra I/O out of my job if at all possible. Am I missing some undocumented info, or is it not possible (yet)?
Upvotes: 11
Views: 40742
Reputation: 3008
if you are using spark version > 2.3.0 then you can use the unionByName
in-built function to get the required output.
Link to the Git Repo that contains the code for the unionByName code: https://github.com/apache/spark/blame/cee4ecbb16917fa85f02c635925e2687400aa56b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1894
Upvotes: 0
Reputation: 691
Here's the version in Scala also answered here - ( Spark - Merge / Union DataFrame with Different Schema (column names and sequence) to a DataFrame with Master common schema ) -
It takes List of dataframe to be unioned .. Provided same named columns in all the dataframe should have same datatype..
def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {
/**
* This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
* Creates a Unioned DataFrame
*/
import spark.implicits._
val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct
def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
// Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases
val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)
val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)
DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))
}
Here is the sample test for it -
val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
val bDF = Seq(("C", 1, "D1"), ("D", 2, "D2")).toDF("Name", "Sal", "Deptt")
unionPro(List(aDF, bDF), spark).show
Which gives output as -
+----+----+----+-----+
|Name| ID| Sal|Deptt|
+----+----+----+-----+
| A| 1|null| null|
| B| 2|null| null|
| C|null| 1| D1|
| D|null| 2| D2|
+----+----+----+-----+
Upvotes: 0
Reputation: 1407
If you read both data frames from storage files you can just use predefined schema:
val schemaForRead =
StructType(List(
StructField("userId", LongType,true),
StructField("dtEvent", LongType,true),
StructField("goodsId", LongType,true)
))
val dfA = spark.read.format("parquet").schema(schemaForRead).load("/tmp/file1.parquet")
val dfB = spark.read.format("parquet").schema(schemaForRead).load("/tmp/file2.parquet")
val dfC = dfA.union(dfB)
Note that schema in files file1
and file2
can be different and can differ form schemaForRead
. If file1
doesn't contain field from schemaForRead
dataframe A
will have empty field with null
's. If file contains additional field not presented in schemaForRead
dataframe just wouldn't have it.
Upvotes: 0
Reputation: 823
Thanks @conradlee! I modified your solution to allow union by adding casting and removing nullability check. It worked for me.
def harmonize_schemas_and_combine(df_left, df_right):
'''
df_left is the main df; we try to append the new df_right to it.
Need to do three things here:
1. Set other claim/clinical features to NULL
2. Align schemas (data types)
3. Align column orders
'''
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType) for f in df_left.schema)
right_fields = set((f.name, f.dataType) for f in df_right.schema)
# import pdb; pdb.set_trace() #pdb debugger
# I. First go over left-unique fields:
# For columns in the main df, but not in the new df: add it as Null
# For columns in both df but w/ different datatypes, use casting to keep them consistent w/ main df (Left)
for l_name, l_type in left_fields.difference(right_fields): #1. find what Left has, Right doesn't
if l_name in right_types: #2A. if column is in both, then something's off w/ the schema
r_type = right_types[l_name] #3. tell me what's this column's type in Right
df_right = df_right.withColumn(l_name,df_right[l_name].cast(l_type)) #4. keep them consistent w/ main df (Left)
print("Casting magic happened on column %s: Left type: %s, Right type: %s. Both are now: %s." % (l_name, l_type, r_type, l_type))
else: #2B. if Left column is not in Right, add a NULL column to Right df
df_right = df_right.withColumn(l_name, F.lit(None).cast(l_type))
# Make sure Right columns are in the same order of Left
df_right = df_right.select(df_left.columns)
return df_left.union(df_right)
Upvotes: 2
Reputation: 831
Here is another solution for this. I used rdd union because dataFrame union operation doesnt support multiple dataFrames. Note - This should not be used to merge lot of dataFrames with different schema. The cost of adding null columns to dataFrames will result quickly in out of memory errors. (i.e: trying to merge 1000 dataFrames with 10 columns missing will result in 10,000 transformations) If your use case it to read a dataFrame from storage with different schema that is composed from multiple paths with different schemas, a much better option would be to have your data saved as parquet in the first place and then use the 'mergeSchema' option when reading the dataFrame.
def unionDataFramesAndMergeSchema(spark, dfsList):
'''
This function can perform a union between x dataFrames with different schemas.
Non-existing columns will be filled with null.
Note: If a column exist in 2 dataFrames with different types, an exception will be thrown.
:example:
>>> df1 = spark.createDataFrame([
>>> {
>>> 'A': 1,
>>> 'B': 1,
>>> 'C': 1
>>> }])
>>> df2 = spark.createDataFrame([
>>> {
>>> 'A': 2,
>>> 'C': 2,
>>> 'DNew' : 2
>>> }])
>>> unionDataFramesAndMergeSchema(spark,[df1,df2]).show()
>>> +---+----+---+----+
>>> | A| B| C|DNew|
>>> +---+----+---+----+
>>> | 2|null| 2| 2|
>>> | 1| 1| 1|null|
>>> +---+----+---+----+
:param spark: The Spark session.
:param dfsList: A list of dataFrames.
:return: A union of all dataFrames, with schema merged.
'''
if len(dfsList) == 0:
raise ValueError("DataFrame list is empty.")
if len(dfsList) == 1:
logging.info("The list contains only one dataFrame, no need to perform union.")
return dfsList[0]
logging.info("Will perform union between {0} dataFrames...".format(len(dfsList)))
columnNamesAndTypes = {}
logging.info("Calculating unified column names and types...")
for df in dfsList:
for columnName, columnType in dict(df.dtypes).iteritems():
if columnNamesAndTypes.has_key(columnName) and columnNamesAndTypes[columnName] != columnType:
raise ValueError(
"column '{0}' exist in at least 2 dataFrames with different types ('{1}' and '{2}'"
.format(columnName, columnType, columnNamesAndTypes[columnName]))
columnNamesAndTypes[columnName] = columnType
logging.info("Unified column names and types: {0}".format(columnNamesAndTypes))
logging.info("Adding null columns in dataFrames if needed...")
newDfsList = []
for df in dfsList:
newDf = df
dfTypes = dict(df.dtypes)
for columnName, columnType in dict(columnNamesAndTypes).iteritems():
if not dfTypes.has_key(columnName):
# logging.info("Adding null column for '{0}'.".format(columnName))
newDf = newDf.withColumn(columnName, func.lit(None).cast(columnType))
newDfsList.append(newDf)
dfsWithOrderedColumnsList = [df.select(columnNamesAndTypes.keys()) for df in newDfsList]
logging.info("Performing a flat union between all dataFrames (as rdds)...")
allRdds = spark.sparkContext.union([df.rdd for df in dfsWithOrderedColumnsList])
return allRdds.toDF()
Upvotes: 1
Reputation: 5135
parquet schema merging is disabled by default, turn on this option by:
(1) set global option: spark.sql.parquet.mergeSchema=true
(2) write code: sqlContext.read.option("mergeSchema", "true").parquet("my.parquet")
Upvotes: 8
Reputation: 13705
Here's a pyspark solution.
It assumes that if the merge can't take place because one dataframe is missing a column contained in the other, then the right thing is to add the missing column with null values.
On the other hand, if the merge can't take place because the two dataframes share a column with conflicting type or nullability, then the right thing is to raise a TypeError (because that's a conflict you probably want to know about).
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = right_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
return df_left.union(df_right)
Upvotes: 3
Reputation: 1711
You can append a null column to frame B and after union 2 frames:
import org.apache.spark.sql.functions._
val missingFields = A.schema.toSet.diff(B.schema.toSet)
var C: DataFrame = null
for (field <- missingFields){
C = A.withColumn(field.name, expr("null"));
}
A.unionAll(C)
Upvotes: 8