user2717470
user2717470

Reputation: 287

Reorder source Spark dataframe columns to match the order of the target dataframe in PySpark

I have a fixed Spark DataFrame order from the target table:

Target Spark Dataframe(col1 string , col2 int , col3 string , col4 double)

Now, if the source data comes in a jumbled order:

Source Spark Dataframe(col3 string , col2 int ,col4 double , col1 string).

How can I rearrange the source DataFrame to match the column order of the target DataFrame using PySpark?

The Source Spark Dataframe should be reordered like below to match the target DataFrame:

Output:

Updated Source Spark Dataframe(col1 string , col2 int , col3 string , col4 double)

Scenario 2:

Source Dataframe =[a,c,d,e]

Target dataframe =[a,b,c,d]

In this scenario, the source DataFrame should be rearranged to [a,b,c,d,e]

In the above example, after the source DataFrame is rearranged, it would have a b column added with null values.

This will ensure that when we use saveAsTable, the source DataFrame can easily be pushed into the table without breaking the existing table.

Upvotes: 0

Views: 5062

Answers (1)

pault
pault

Reputation: 43504

Suppose you had the following two DataFrames:

source.show()
#+---+---+---+---+
#|  a|  c|  d|  e|
#+---+---+---+---+
#|  A|  C|  0|  E|
#+---+---+---+---+

target.show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  A|  B|  C|  1|
#+---+---+---+---+

With the following data types:

print(source.dtypes)
#[('a', 'string'), ('c', 'string'), ('d', 'string'), ('e', 'string')]

print(target.dtypes)
#[('a', 'string'), ('b', 'string'), ('c', 'string'), ('d', 'int')]

If I understand your logic correctly, the following list comprehension should work for you:

from pyspark.sql.functions import col, lit

new_source = source.select(
    *(
        [
            col(t).cast(d) if t in source.columns else lit(None).alias(t) 
            for t, d in target.dtypes
        ] +
        [s for s in source.columns if s not in target.columns]
    )
)

new_source.show()

new_source.show()
#+---+----+---+---+---+
#|  a|   b|  c|  d|  e|
#+---+----+---+---+---+
#|  A|null|  C|  0|  E|
#+---+----+---+---+---+

And the resulting output will have the following schema:

new_source.printSchema()
#root
# |-- a: string (nullable = true)
# |-- b: null (nullable = true)
# |-- c: string (nullable = true)
# |-- d: integer (nullable = true)
# |-- e: string (nullable = true)

As you can see, column d's datatype changed from string to integer to match the target table's schema.

The logic is to first loop over the columns in target and select them if they exist in source.columns or create a column of nulls if it doesn't exist. Then add in the columns from source that don't exist in target.

Upvotes: 1

Related Questions