Reputation: 8844
df1
uid1 var1
0 John 3
1 Paul 4
2 George 5
df2
uid1 var2
0 John 23
1 Paul 44
2 George 52
df3
uid1 var3
0 John 31
1 Paul 45
2 George 53
df_lst=[df1,df2,df3]
How do I merge/join the 3 dataframes in the list based on common key uid1 ?
Edit: Expected output
df1
uid1 var1 var2 var3
0 John 3 23 31
1 Paul 4 44 45
2 George 5 52 53
Upvotes: 5
Views: 21140
Reputation: 127
Python3 reduce - analog to koiralos answer:
from functools import reduce
from typing import List
from pyspark.sql import DataFrame
dfs: List[DataFrame]
df: DataFrame = reduce(
lambda left, right: left.join(right, ["key_1", "key_2", ]),
dfs
)
Upvotes: 0
Reputation: 69
Let me suggest python answer:
from pyspark import SparkContext
SparkContext._active_spark_context.stop()
sc = SparkContext()
sqlcontext = SQLContext(sc)
import pyspark.sql.types as t
rdd_list = [sc.parallelize([('John',i+1),('Paul',i+2),('George',i+3)],1) \
for i in [100,200,300]]
df_list = []
for i,r in enumerate(rdd_list):
schema = t.StructType().add('uid1',t.StringType())\
.add('var{}'.format(i+1),t.IntegerType())
df_list.append(sqlcontext.createDataFrame(r, schema))
df_list[-1].show()
+------+----+
| uid1|var1|
+------+----+
| John| 101|
| Paul| 102|
|George| 103|
+------+----+
+------+----+
| uid1|var2|
+------+----+
| John| 201|
| Paul| 202|
|George| 203|
+------+----+
+------+----+
| uid1|var3|
+------+----+
| John| 301|
| Paul| 302|
|George| 303|
+------+----+
df_res = df_list[0]
for df_next in df_list[1:]:
df_res = df_res.join(df_next,on='uid1',how='inner')
df_res.show()
+------+----+----+----+
| uid1|var1|var2|var3|
+------+----+----+----+
| John| 101| 201| 301|
| Paul| 102| 202| 302|
|George| 103| 203| 303|
+------+----+----+----+
One more option:
def join_red(left,right):
return left.join(right,on='uid1',how='inner')
res = reduce(join_red, df_list)
res.show()
+------+----+----+----+
| uid1|var1|var2|var3|
+------+----+----+----+
| John| 101| 201| 301|
| Paul| 102| 202| 302|
|George| 103| 203| 303|
+------+----+----+----+
Upvotes: 4
Reputation: 23109
You can join a list of dataframe. Below is the simple example
import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(
(0,"John",3),
(1,"Paul",4),
(2,"George",5)
)).toDF("id", "uid1", "var1")
import spark.implicits._
val df2 = spark.sparkContext.parallelize(Seq(
(0,"John",23),
(1,"Paul",44),
(2,"George",52)
)).toDF("id", "uid1", "var2")
import spark.implicits._
val df3 = spark.sparkContext.parallelize(Seq(
(0,"John",31),
(1,"Paul",45),
(2,"George",53)
)).toDF("id", "uid1", "var3")
val df = List(df1, df2, df3)
df.reduce((a,b) => a.join(b, Seq("id", "uid1")))
Output:
+---+------+----+----+----+
| id| uid1|var1|var2|var3|
+---+------+----+----+----+
| 1| Paul| 4| 44| 45|
| 2|George| 5| 52| 53|
| 0| John| 3| 23| 31|
+---+------+----+----+----+
Hope this helps!
Upvotes: 8
Reputation: 41957
Merge
and join
are two different things in dataframe
. According to what I understand from your question join
would be the one
joining them as
df1.join(df2, df1.uid1 == df2.uid1).join(df3, df1.uid1 == df3.uid1)
should do the trick but I also suggest to change the column
names of df2
and df3
dataframes
to uid2
and uid3
so that conflict doesn't arise in the future
Upvotes: 0