user3803714
user3803714

Reputation: 5389

How to join on multiple columns in Pyspark?

I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL)

The following works:

I first register them as temp tables.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')

I would now like to join them based on multiple columns.

I get SyntaxError: invalid syntax with this:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')

Upvotes: 97

Views: 263237

Answers (5)

petermeissner
petermeissner

Reputation: 12900

There are many ways to specify column names in join() but I find the most flexible one is to use a list of expressions.

The on=-parameter of join() allows to specify a list. This list can contain expressions referencing columns and evaluating to true/false. With this it is possible to ...

  • join multiple columns
  • join columns with different names
  • join columns that have been renamed beforehand
  • add arbitrary restrictions on when two rows are considered for matching (e.g. if you have to make sure that some other restriction is fulfilled, e.g. rows from one table should be within a timespan defined in the other table)
import pyspark.sql.functions as F

# dfs
df1 = spark.createDataFrame([(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)], ("x1", "x2", "x3"))
df2 = spark.createDataFrame([(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

# normal join
df1.join(other=df2, on="x1").show()

# multi join
df1.join(other=df2, on=["x1", "x2"]).show()

# multi join with different column names and restrictions
df1.withColumnRenamed("x1", "x1_renamed").alias("df1").join(
    other=df2.alias("df2"),
    on=[
        F.col("df1.x1_renamed") == F.col("df2.x1"), 
        F.col("df1.x2") < F.col("df2.x2"),
    ],
).drop(F.col("df2.x1")).show()

Upvotes: 1

Ander Herrrera
Ander Herrrera

Reputation: 131

test = numeric.join(Ref, 
   on=[
     numeric.ID == Ref.ID, 
     numeric.TYPE == Ref.TYPE,
     numeric.STATUS == Ref.STATUS 
   ], how='inner')
  • This will inner join two dataframes (numeric and Ref)
  • using multiple constraints
  • where specific columns from numeric match specific columns from Ref
  • e.g. ... numeric.ID == Ref.ID ... .

This pattern would also allow to match columns with different names in both dataframes (e.g. ... numeric.ref_ID == Ref.ID, ...).

Upvotes: 13

Mehul Fadnavis
Mehul Fadnavis

Reputation: 31

You can also provide a list of strings, if the column names are the same.

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, ["x1","x2"])

df.show()
+---+---+---+---+
| x1| x2| x3| x3|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

Another way to go about this, if column names are different and if you want to rely on column name strings is the following:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("y1", "y2", "y3"))

df = df1.join(df2, (col("x1")==col("y1")) & (col("x2")==col("y2")))

df.show()
+---+---+---+---+---+---+
| x1| x2| x3| y1| y2| y3|
+---+---+---+---+---+---+
|  2|  b|3.0|  2|  b|0.0|
+---+---+---+---+---+---+

This is useful if you want to reference column names dynamically and also in instances where there is a space in the column name and you cannot use the df.col_name syntax. You should look at changing the column name in that case anyway though.

Upvotes: 2

zero323
zero323

Reputation: 330453

You should use & / | operators and be careful about operator precedence (== has lower precedence than bitwise AND and OR):

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+

Upvotes: 157

Florian
Florian

Reputation: 25435

An alternative approach would be:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()

which outputs:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

With the main advantage being that the columns on which the tables are joined are not duplicated in the output, reducing the risk of encountering errors such as org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.


Whenever the columns in the two tables have different names, (let's say in the example above, df2 has the columns y1, y2 and y4), you could use the following syntax:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])

Upvotes: 100

Related Questions