Ivan
Ivan

Reputation: 20101

Concatenate two PySpark dataframes

I'm trying to concatenate two PySpark dataframes with some columns that are only on one of them:

from pyspark.sql.functions import randn, rand

df_1 = sqlContext.range(0, 10)

+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

df_2 = sqlContext.range(11, 20)

+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))

and now I want to generate a third dataframe. I would like something like pandas concat:

df_1.show()
+---+--------------------+--------------------+
| id|             uniform|              normal|
+---+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|
|  1|  0.8642043127063618|  0.3900018344856156|
|  2|  0.8292577771850476|  1.8077401259195247|
|  3|   0.198558705368724| -0.4270585782850261|
|  4|0.012661361966674889|   0.702634599720141|
|  5|  0.8535692890157796|-0.42355804115129153|
|  6|  0.3723296190171911|  1.3789648582622995|
|  7|  0.9529794127670571| 0.16238718777444605|
|  8|  0.9746632635918108| 0.02448061333761742|
|  9|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

df_2.show()
+---+--------------------+--------------------+
| id|             uniform|            normal_2|
+---+--------------------+--------------------+
| 11|  0.3221262660507942|  1.0269298899109824|
| 12|  0.4030672316912547|   1.285648175568798|
| 13|  0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876|  -0.678915153834693|
| 15|  0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17|  0.6411908952297819|  0.9161177183227823|
| 18|  0.5669232696934479|  0.7270125277020573|
| 19|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

#do some concatenation here, how?

df_concat.show()

| id|             uniform|              normal| normal_2   |
+---+--------------------+--------------------+------------+
|  0|  0.8122802274304282|  1.2423430583597714| None       |
|  1|  0.8642043127063618|  0.3900018344856156| None       |
|  2|  0.8292577771850476|  1.8077401259195247| None       |
|  3|   0.198558705368724| -0.4270585782850261| None       |
|  4|0.012661361966674889|   0.702634599720141| None       |
|  5|  0.8535692890157796|-0.42355804115129153| None       |
|  6|  0.3723296190171911|  1.3789648582622995| None       |
|  7|  0.9529794127670571| 0.16238718777444605| None       |
|  8|  0.9746632635918108| 0.02448061333761742| None       |
|  9|   0.513622008243935|  0.7626741803250845| None       |
| 11|  0.3221262660507942|  None              | 0.123      |
| 12|  0.4030672316912547|  None              |0.12323     |
| 13|  0.9690555459609131|  None              |0.123       |
| 14|0.011913836266515876|  None              |0.18923     |
| 15|  0.9359607054250594|  None              |0.99123     |
| 16| 0.45680471157575453|  None              |0.123       |
| 17|  0.6411908952297819|  None              |1.123       |
| 18|  0.5669232696934479|  None              |0.10023     |
| 19|   0.513622008243935|  None              |0.916332123 |
+---+--------------------+--------------------+------------+

Is that possible?

Upvotes: 136

Views: 447983

Answers (14)

Daniel de Paula
Daniel de Paula

Reputation: 17872

Maybe you can try creating the unexisting columns and calling union (unionAll for Spark 1.6 or lower):

from pyspark.sql.functions import lit

cols = ['id', 'uniform', 'normal', 'normal_2']    

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)

result = df_1_new.union(df_2_new)

# To remove the duplicates:

result = result.dropDuplicates()

Upvotes: 153

ganesh kavhar
ganesh kavhar

Reputation: 33

Here is the basic script to perform concatenations on dataframes:

from pyspark.sql.functions import lit

cols = ['id', 'uniform', 'normal', 'normal_2']    

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)

result = df_1_new.union(df_2_new)

# To remove the duplicates:

result = result.dropDuplicates()

Upvotes: 0

Tree DR
Tree DR

Reputation: 469

To concatenate multiple pyspark dataframes into one:

from functools import reduce

df = reduce(lambda x,y:x.union(y), [df_1,df_2])

And you can replace the list of [df_1, df_2] to a list of any length.

Upvotes: 15

G.G
G.G

Reputation: 765

import pyspark.pandas as ps
ps.concat([df_1.pandas_api('id'),df_2.pandas_api('id')])

out:

+-------------------+--------------------+--------------------+
|            uniform|              normal|            normal_2|
+-------------------+--------------------+--------------------+
|0.03422639313807285| 0.45800664187768786|                null|
| 0.3654625958161396| 0.16420866768809156|                null|
| 0.4175019040792016| -1.0451987154313813|                null|
|0.16452185994603707|  0.8306551181802446|                null|
|0.18141810315190554|  1.3792681955381285|                null|
| 0.9697474945375325|  0.5991404703866096|                null|
|0.34084319330900115| -1.4298752463301871|                null|
| 0.4725977369833597|-0.19668314148825305|                null|
| 0.5996723933366402| -0.6950946460326453|                null|
| 0.6396141227834357|-0.07706276399864868|                null|
|0.03422639313807285|                null| 0.45800664187768786|
| 0.3654625958161396|                null| 0.16420866768809156|
| 0.4175019040792016|                null| -1.0451987154313813|
|0.18141810315190554|                null|  1.3792681955381285|
|0.49595620559530806|                null|-0.02511484514022299|
| 0.9697474945375325|                null|  0.5991404703866096|
| 0.4725977369833597|                null|-0.19668314148825305|
| 0.5996723933366402|                null| -0.6950946460326453|
| 0.6396141227834357|                null|-0.07706276399864868|
+-------------------+--------------------+--------------------+

Upvotes: 2

Prasun Bhattacharyya
Prasun Bhattacharyya

Reputation: 72

Here you are trying to concat i.e union all records between 2 dataframes. Utilize simple unionByName method in pyspark, which concats 2 dataframes along axis 0 as done by pandas concat method.

Now suppose you have df1 with columns id, uniform, normal and also you have df2 which has columns id, uniform and normal_2. In order to get a third df3 with columns id, uniform, normal, normal_2. You must add the non-existent columns in either of the 2 dfs: df1 and df2.

# Add df1 columns which are not in df2 in df2 as null columns
for column in df1.columns:
    if column not in df2.columns:
         df2 = df2.withColumn(column, lit(None))

# Add df2 columns which are not in df1 in df1 as null columns
for column in df2.columns:
    if column not in df1.columns:
         df1 = df1.withColumn(column, lit(None))

df3 = df1.unionByName(df2)

Pyspark unionByName documentation

Upvotes: 1

MikA
MikA

Reputation: 5542

unionByName is a built-in option available in spark which is available from spark 2.3.0.

with spark version 3.1.0, there is allowMissingColumns option with the default value set to False to handle missing columns. Even if both dataframes don't have the same set of columns, this function will work, setting missing column values to null in the resulting dataframe.

df_1.unionByName(df_2, allowMissingColumns=True).show()

+---+--------------------+--------------------+--------------------+
| id|             uniform|              normal|            normal_2|
+---+--------------------+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|                null|
|  1|  0.8642043127063618|  0.3900018344856156|                null|
|  2|  0.8292577771850476|  1.8077401259195247|                null|
|  3|   0.198558705368724| -0.4270585782850261|                null|
|  4|0.012661361966674889|   0.702634599720141|                null|
|  5|  0.8535692890157796|-0.42355804115129153|                null|
|  6|  0.3723296190171911|  1.3789648582622995|                null|
|  7|  0.9529794127670571| 0.16238718777444605|                null|
|  8|  0.9746632635918108| 0.02448061333761742|                null|
|  9|   0.513622008243935|  0.7626741803250845|                null|
| 11|  0.3221262660507942|                null|  1.0269298899109824|
| 12|  0.4030672316912547|                null|   1.285648175568798|
| 13|  0.9690555459609131|                null|-0.22986601831364423|
| 14|0.011913836266515876|                null|  -0.678915153834693|
| 15|  0.9359607054250594|                null|-0.16557488664743034|
| 16| 0.45680471157575453|                null| -0.3885563551710555|
| 17|  0.6411908952297819|                null|  0.9161177183227823|
| 18|  0.5669232696934479|                null|  0.7270125277020573|
| 19|   0.513622008243935|                null|  0.7626741803250845|
+---+--------------------+--------------------+--------------------+

Upvotes: 42

Bobby John
Bobby John

Reputation: 13

I would solve this in this way:

from pyspark.sql import SparkSession
df_1.createOrReplaceTempView("tab_1")
df_2.createOrReplaceTempView("tab_2")
df_concat=spark.sql("select tab_1.id,tab_1.uniform,tab_1.normal,tab_2.normal_2  from tab_1 tab_1 left join tab_2 tab_2 on tab_1.uniform=tab_2.uniform\
                union\
                select tab_2.id,tab_2.uniform,tab_1.normal,tab_2.normal_2  from tab_2 tab_2 left join tab_1 tab_1 on tab_1.uniform=tab_2.uniform")
df_concat.show()

Upvotes: 0

imtheone
imtheone

Reputation: 51

I was trying to implement pandas append functionality in pyspark and what I created a custom function where we can concat 2 or more data frame even they are having different no. of columns only condition is if dataframes have identical name then their datatype should be same/match.

I have written a custom function to merge 2 dataframes.

def append_dfs(df1,df2):
    list1 = df1.columns
    list2 = df2.columns
    for col in list2:
        if(col not in list1):
            df1 = df1.withColumn(col, F.lit(None))
    for col in list1:
        if(col not in list2):
            df2 = df2.withColumn(col, F.lit(None))
    return df1.unionByName(df2)

usage:

  1. concate 2 dataframes

    final_df = append_dfs(df1,df2)

    1. concate more than 2(say3) dataframes

    final_df = append_dfs(append_dfs(df1,df2),df3)

example:

df1:

enter image description here

df2:

enter image description here

result=append_dfs(df1,df2)

result :

+------+---+-------+---------+
|  Name|Age|Married|  Address|
+------+---+-------+---------+
|   Jai| 27|   true|     null|
|Princi| 24|  false|     null|
|Gaurav| 22|  false|     null|
|  Anuj| 22|   true|     null|
|   Jai| 27|   null|    Delhi|
|Princi| 24|   null|   Kanpur|
|Gaurav| 22|   null|Allahabad|
|  Anuj| 22|   null|     null|
+------+---+-------+---------+

Hope this will useful.

Upvotes: 1

Shadowtrooper
Shadowtrooper

Reputation: 1452

You can use unionByName to make this:

df = df_1.unionByName(df_2)

unionByName is available since Spark 2.3.0.

Upvotes: 23

Yuchen
Yuchen

Reputation: 33036

To make it more generic of keeping both columns in df1 and df2:

import pyspark.sql.functions as F

# Keep all columns in either df1 or df2
def outter_union(df1, df2):

    # Add missing columns to df1
    left_df = df1
    for column in set(df2.columns) - set(df1.columns):
        left_df = left_df.withColumn(column, F.lit(None))

    # Add missing columns to df2
    right_df = df2
    for column in set(df1.columns) - set(df2.columns):
        right_df = right_df.withColumn(column, F.lit(None))

    # Make sure columns are ordered the same
    return left_df.union(right_df.select(left_df.columns))

Upvotes: 11

David
David

Reputation: 11583

df_concat = df_1.union(df_2)

The dataframes may need to have identical columns, in which case you can use withColumn() to create normal_1 and normal_2

Upvotes: 97

furianpandit
furianpandit

Reputation: 161

Above answers are very elegant. I have written this function long back where i was also struggling to concatenate two dataframe with distinct columns.

Suppose you have dataframe sdf1 and sdf2

from pyspark.sql import functions as F
from pyspark.sql.types import *

def unequal_union_sdf(sdf1, sdf2):
    s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema)
    s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema)

    for i,j in s_df2_schema.difference(s_df1_schema):
        sdf1 = sdf1.withColumn(i,F.lit(None).cast(j))

    for i,j in s_df1_schema.difference(s_df2_schema):
        sdf2 = sdf2.withColumn(i,F.lit(None).cast(j))

    common_schema_colnames = sdf1.columns
    sdk = \
        sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames))
    return sdk 

sdf_concat = unequal_union_sdf(sdf1, sdf2) 

Upvotes: 2

Hans
Hans

Reputation: 2910

This should do it for you ...

from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, rand, lit, coalesce, col
import pyspark.sql.functions as F

df_1 = sqlContext.range(0, 6)
df_2 = sqlContext.range(3, 10)
df_1 = df_1.select("id", lit("old").alias("source"))
df_2 = df_2.select("id")

df_1.show()
df_2.show()
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\
  .select(\
    [coalesce(df_1.id, df_2.id).alias("id")] +\
    [col("df_1." + c) for c in df_1.columns if c != "id"])\
  .sort("id")
df_3.show()

Upvotes: 0

user020314
user020314

Reputation: 51

Here is one way to do it, in case it is still useful: I ran this in pyspark shell, Python version 2.7.12 and my Spark install was version 2.0.1.

PS: I guess you meant to use different seeds for the df_1 df_2 and the code below reflects that.

from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, rand
import pyspark.sql.functions as F

df_1 = sqlContext.range(0, 10)
df_2 = sqlContext.range(11, 20)
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2"))

def get_uniform(df1_uniform, df2_uniform):
    if df1_uniform:
        return df1_uniform
    if df2_uniform:
        return df2_uniform

u_get_uniform = F.udf(get_uniform, FloatType())

df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))

Here are the outputs I get:

df_1.show()
+---+-------------------+--------------------+
| id|            uniform|              normal|
+---+-------------------+--------------------+
|  0|0.41371264720975787|  0.5888539012978773|
|  1| 0.7311719281896606|  0.8645537008427937|
|  2| 0.1982919638208397| 0.06157382353970104|
|  3|0.12714181165849525|  0.3623040918178586|
|  4| 0.7604318153406678|-0.49575204523675975|
|  5|0.12030715258495939|  1.0854146699817222|
|  6|0.12131363910425985| -0.5284523629183004|
|  7|0.44292918521277047| -0.4798519469521663|
|  8| 0.8898784253886249| -0.8820294772950535|
|  9|0.03650707717266999| -2.1591956435415334|
+---+-------------------+--------------------+

df_2.show()
+---+-------------------+--------------------+
| id|            uniform|            normal_2|
+---+-------------------+--------------------+
| 11| 0.1982919638208397| 0.06157382353970104|
| 12|0.12714181165849525|  0.3623040918178586|
| 13|0.12030715258495939|  1.0854146699817222|
| 14|0.12131363910425985| -0.5284523629183004|
| 15|0.44292918521277047| -0.4798519469521663|
| 16| 0.8898784253886249| -0.8820294772950535|
| 17| 0.2731073068483362|-0.15116027592854422|
| 18| 0.7784518091224375| -0.3785563841011868|
| 19|0.43776394586845413| 0.47700719174464357|
+---+-------------------+--------------------+

df_3.show()
+---+-----------+--------------------+--------------------+                     
| id|    uniform|              normal|            normal_2|
+---+-----------+--------------------+--------------------+
|  0| 0.41371265|  0.5888539012978773|                null|
|  1|  0.7311719|  0.8645537008427937|                null|
|  2| 0.19829196| 0.06157382353970104|                null|
|  3| 0.12714182|  0.3623040918178586|                null|
|  4|  0.7604318|-0.49575204523675975|                null|
|  5|0.120307155|  1.0854146699817222|                null|
|  6| 0.12131364| -0.5284523629183004|                null|
|  7| 0.44292918| -0.4798519469521663|                null|
|  8| 0.88987845| -0.8820294772950535|                null|
|  9|0.036507078| -2.1591956435415334|                null|
| 11| 0.19829196|                null| 0.06157382353970104|
| 12| 0.12714182|                null|  0.3623040918178586|
| 13|0.120307155|                null|  1.0854146699817222|
| 14| 0.12131364|                null| -0.5284523629183004|
| 15| 0.44292918|                null| -0.4798519469521663|
| 16| 0.88987845|                null| -0.8820294772950535|
| 17| 0.27310732|                null|-0.15116027592854422|
| 18|  0.7784518|                null| -0.3785563841011868|
| 19| 0.43776396|                null| 0.47700719174464357|
+---+-----------+--------------------+--------------------+

Upvotes: 5

Related Questions