Nurzhan Nogerbek
Nurzhan Nogerbek

Reputation: 5246

How correctly to join 2 dataframe in Apache Spark?

I am new in Apache Spark and need some help. Can someone say how correctly to join next 2 dataframes?!

First dataframe:

| DATE_TIME           | PHONE_NUMBER |
|---------------------|--------------|
| 2019-01-01 00:00:00 | 7056589658   |
| 2019-02-02 00:00:00 | 7778965896   |

Second dataframe:

| DATE_TIME           | IP            |
|---------------------|---------------|
| 2019-01-01 01:00:00 | 194.67.45.126 |
| 2019-02-02 00:00:00 | 102.85.62.100 |
| 2019-03-03 03:00:00 | 102.85.62.100 |

Final dataframe which I want:

| DATE_TIME           | PHONE_NUMBER | IP            |
|---------------------|--------------|---------------|
| 2019-01-01 00:00:00 | 7056589658   |               |
| 2019-01-01 01:00:00 |              | 194.67.45.126 |
| 2019-02-02 00:00:00 | 7778965896   | 102.85.62.100 |
| 2019-03-03 03:00:00 |              | 102.85.62.100 |

Here below the code which I tried:

import org.apache.spark.sql.Dataset
import spark.implicits._

val df1 = Seq(
    ("2019-01-01 00:00:00", "7056589658"),
    ("2019-02-02 00:00:00", "7778965896")
).toDF("DATE_TIME", "PHONE_NUMBER")

df1.show()

val df2 = Seq(
    ("2019-01-01 01:00:00", "194.67.45.126"),
    ("2019-02-02 00:00:00", "102.85.62.100"),
    ("2019-03-03 03:00:00", "102.85.62.100")
).toDF("DATE_TIME", "IP")

df2.show()

val total = df1.join(df2, Seq("DATE_TIME"), "left_outer")

total.show()

Unfortunately, it raise error:

org.apache.spark.SparkException: Exception thrown in awaitResult:
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
  at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
  at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
...

Upvotes: 1

Views: 98

Answers (2)

Ranga Vure
Ranga Vure

Reputation: 1932

You need to full outer join, but your code is good. Your issue might be some thing else, but with the stack trace you mentioned can't conclude what the issue is.

val total = df1.join(df2, Seq("DATE_TIME"), "full_outer")

Upvotes: 3

Gal Naor
Gal Naor

Reputation: 2397

You can do this:

val total = df1.join(df2, (df1("DATE_TIME") === df2("DATE_TIME")), "left_outer")

Upvotes: 1

Related Questions