Reputation: 287
I have a fixed Spark DataFrame order from the target table:
Target Spark Dataframe(col1 string , col2 int , col3 string , col4 double)
Now, if the source data comes in a jumbled order:
Source Spark Dataframe(col3 string , col2 int ,col4 double , col1 string).
How can I rearrange the source DataFrame to match the column order of the target DataFrame using PySpark?
The Source Spark Dataframe should be reordered like below to match the target DataFrame:
Output:
Updated Source Spark Dataframe(col1 string , col2 int , col3 string , col4 double)
Scenario 2:
Source Dataframe =[a,c,d,e]
Target dataframe =[a,b,c,d]
In this scenario, the source DataFrame should be rearranged to [a,b,c,d,e]
null
values.In the above example, after the source DataFrame is rearranged, it would have a b
column added with null
values.
This will ensure that when we use saveAsTable
, the source DataFrame can easily be pushed into the table without breaking the existing table.
Upvotes: 0
Views: 5062
Reputation: 43504
Suppose you had the following two DataFrames:
source.show()
#+---+---+---+---+
#| a| c| d| e|
#+---+---+---+---+
#| A| C| 0| E|
#+---+---+---+---+
target.show()
#+---+---+---+---+
#| a| b| c| d|
#+---+---+---+---+
#| A| B| C| 1|
#+---+---+---+---+
With the following data types:
print(source.dtypes)
#[('a', 'string'), ('c', 'string'), ('d', 'string'), ('e', 'string')]
print(target.dtypes)
#[('a', 'string'), ('b', 'string'), ('c', 'string'), ('d', 'int')]
If I understand your logic correctly, the following list comprehension should work for you:
from pyspark.sql.functions import col, lit
new_source = source.select(
*(
[
col(t).cast(d) if t in source.columns else lit(None).alias(t)
for t, d in target.dtypes
] +
[s for s in source.columns if s not in target.columns]
)
)
new_source.show()
new_source.show()
#+---+----+---+---+---+
#| a| b| c| d| e|
#+---+----+---+---+---+
#| A|null| C| 0| E|
#+---+----+---+---+---+
And the resulting output will have the following schema:
new_source.printSchema()
#root
# |-- a: string (nullable = true)
# |-- b: null (nullable = true)
# |-- c: string (nullable = true)
# |-- d: integer (nullable = true)
# |-- e: string (nullable = true)
As you can see, column d
's datatype changed from string
to integer
to match the target table's schema.
The logic is to first loop over the columns in target
and select them if they exist in source.columns
or create a column of null
s if it doesn't exist. Then add in the columns from source
that don't exist in target
.
Upvotes: 1