Nurzhan Nogerbek
Nurzhan Nogerbek

Reputation: 5236

How to join two DataFrame with combined columns in Spark?

I don't understand how I can join such 2 DataFrame with each other.

First DataFrame store information about request time of the user to service center.

Let's call this DataFrame df1:

+-----------+---------------------+
| USER_NAME | REQUEST_DATE        |
+-----------+---------------------+
| Alex      | 2018-03-01 00:00:00 |
| Alex      | 2018-09-01 00:00:00 |
| Bob       | 2018-03-01 00:00:00 |
| Mark      | 2018-02-01 00:00:00 |
| Mark      | 2018-07-01 00:00:00 |
| Kate      | 2018-02-01 00:00:00 |
+-----------+---------------------+

Second DataFrame store information about possible period when the user can use the services of the service center (license period).

Lets call it df2.

+-----------+---------------------+---------------------+------------+
| USER_NAME | START_SERVICE       | END_SERVICE         | STATUS     |
+-----------+---------------------+---------------------+------------+
| Alex      | 2018-01-01 00:00:00 | 2018-06-01 00:00:00 | Active     |
| Bob       | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | Not Active |
| Mark      | 2018-01-01 00:00:00 | 2018-05-01 23:59:59 | Active     |
| Mark      | 2018-05-01 00:00:00 | 2018-08-01 23:59:59 | VIP        |
+-----------+---------------------+---------------------+------------+

How to join these 2 DataFrame and return such result? How to get list of users license type at the time of treatment?

+-----------+---------------------+----------------+
| USER_NAME | REQUEST_DATE        | STATUS         |
+-----------+---------------------+----------------+
| Alex      | 2018-03-01 00:00:00 | Active         |
| Alex      | 2018-09-01 00:00:00 | No information |
| Bob       | 2018-03-01 00:00:00 | Not Active     |
| Mark      | 2018-02-01 00:00:00 | Active         |
| Mark      | 2018-07-01 00:00:00 | VIP            |
| Kate      | 2018-02-01 00:00:00 | No information |
+-----------+---------------------+----------------+

CODE:

import org.apache.spark.sql.DataFrame

val df1: DataFrame  = Seq(
    ("Alex", "2018-03-01 00:00:00"),
    ("Alex", "2018-09-01 00:00:00"),
    ("Bob", "2018-03-01 00:00:00"),
    ("Mark", "2018-02-01 00:00:00"),
    ("Mark", "2018-07-01 00:00:00"),
    ("Kate", "2018-07-01 00:00:00")
).toDF("USER_NAME", "REQUEST_DATE")

df1.show()

val df2: DataFrame  = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-06-01 00:00:00", "Active"),
    ("Bob", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "Not Active"),
    ("Mark", "2018-01-01 00:00:00", "2018-05-01 23:59:59", "Active"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 23:59:59", "Active")
).toDF("USER_NAME", "START_SERVICE", "END_SERVICE", "STATUS")

df1.show()

val total = df1.join(df2, df1("USER_NAME")===df2("USER_NAME"), "left").filter(df1("REQUEST_DATE") >= df2("START_SERVICE") && df1("REQUEST_DATE") <= df2("END_SERVICE")).select(df1("*"), df2("STATUS"))

total.show()

ERROR:

org.apache.spark.SparkException: Exception thrown in awaitResult:
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
  at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
  at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:232)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
  at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
  at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
  at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
  at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)

Upvotes: 0

Views: 98

Answers (2)

eakotelnikov
eakotelnikov

Reputation: 58

How to join these 2 DataFrame and return such result?

df_joined = df1.join(df2, Seq('USER_NAME'), 'left' )

How to get a list of all users whose licenses are still relevant?

df_relevant = df_joined
.withColumn('STATUS', when(col('REQUEST_DATE') > col('START_SERVICE') and col('REQUEST_DATE') < col('END_SERVICE'), col('STATUS')).otherwise('No information') 
.select('USER_NAME', 'REQUEST_DATE', 'STATUS' )

Upvotes: 2

stack0114106
stack0114106

Reputation: 8711

You are comparing <= and >= on string values which is incorrect. You should cast them to timestamps before doing such comparisons. The below code worked for me.

BTW.. the filter condition that you used is not giving the results, that you posted in the question. Please check it again.

scala> val df= Seq(("Alex","2018-03-01 00:00:00"),("Alex","2018-09-01 00:00:00"),("Bob","2018-03-01 00:00:00"),("Mark","2018-02-01 00:00:00"),("Mark","2018-07-01 00:00:00"),("Kate","2018-02-01 00:00:00")).toDF("USER_NAME","REQUEST_DATE").withColumn("REQUEST_DATE",to_timestamp('REQUEST_DATE))
df: org.apache.spark.sql.DataFrame = [USER_NAME: string, REQUEST_DATE: timestamp]

scala> df.printSchema
root
 |-- USER_NAME: string (nullable = true)
 |-- REQUEST_DATE: timestamp (nullable = true)


scala> df.show(false)
+---------+-------------------+
|USER_NAME|REQUEST_DATE       |
+---------+-------------------+
|Alex     |2018-03-01 00:00:00|
|Alex     |2018-09-01 00:00:00|
|Bob      |2018-03-01 00:00:00|
|Mark     |2018-02-01 00:00:00|
|Mark     |2018-07-01 00:00:00|
|Kate     |2018-02-01 00:00:00|
+---------+-------------------+


scala> val df2 = Seq(( "Alex","2018-01-01 00:00:00","2018-06-01 00:00:00","Active"),("Bob","2018-01-01 00:00:00","2018-02-01 00:00:00","Not Active"),("Mark","2018-01-01 00:00:00","2018-05-01 23:59:59","Active"),("Mark","2018-05-01 00:00:00","2018-08-01 23:59:59","VIP")).toDF("USER_NAME","START_SERVICE","END_SERVICE","STATUS").withColumn("START_SERVICE",to_timestamp('START_SERVICE)).withColumn("END_SERVICE",to_timestamp('END_SERVICE))
df2: org.apache.spark.sql.DataFrame = [USER_NAME: string, START_SERVICE: timestamp ... 2 more fields]

scala> df2.printSchema
root
 |-- USER_NAME: string (nullable = true)
 |-- START_SERVICE: timestamp (nullable = true)
 |-- END_SERVICE: timestamp (nullable = true)
 |-- STATUS: string (nullable = true)


scala> df2.show(false)
+---------+-------------------+-------------------+----------+
|USER_NAME|START_SERVICE      |END_SERVICE        |STATUS    |
+---------+-------------------+-------------------+----------+
|Alex     |2018-01-01 00:00:00|2018-06-01 00:00:00|Active    |
|Bob      |2018-01-01 00:00:00|2018-02-01 00:00:00|Not Active|
|Mark     |2018-01-01 00:00:00|2018-05-01 23:59:59|Active    |
|Mark     |2018-05-01 00:00:00|2018-08-01 23:59:59|VIP       |
+---------+-------------------+-------------------+----------+


scala> df.join(df2,Seq("USER_NAME"),"leftOuter").filter(" REQUEST_DATE >= START_SERVICE and REQUEST_DATE <= END_SERVICE").select(df("*"),df2("status")).show(false)
+---------+-------------------+------+
|USER_NAME|REQUEST_DATE       |status|
+---------+-------------------+------+
|Alex     |2018-03-01 00:00:00|Active|
|Mark     |2018-02-01 00:00:00|Active|
|Mark     |2018-07-01 00:00:00|VIP   |
+---------+-------------------+------+


scala>

Upvotes: 0

Related Questions