Nurzhan Nogerbek
Nurzhan Nogerbek

Reputation: 5236

How to merge 2 dataframe in Spark (Scala)?

I am new in Spark Framework and need some help!

Assume that the first DataFrame (df1) stores the time that users access the call center.

+---------+-------------------+
|USER_NAME|       REQUEST_DATE|
+---------+-------------------+
|     Mark|2018-02-20 00:00:00|
|     Alex|2018-03-01 00:00:00|
|      Bob|2018-03-01 00:00:00|
|     Mark|2018-07-01 00:00:00|
|     Kate|2018-07-01 00:00:00|
+---------+-------------------+

The second DataFrame stores information about whether a person is a member of the organization. OUT means that the user has left the organization. IN means that the user has come to the organization. START_DATE and END_DATE mean the beginning and end of the corresponding process.

For example, you can see that Alex left the organization at 2018-01-01 00:00:00, and this process ended at 2018-02-01 00:00:00. You can notice that one user can come and left the organization at different times as Mark.

+---------+---------------------+---------------------+--------+
|NAME     | START_DATE          | END_DATE            | STATUS |
+---------+---------------------+---------------------+--------+
|     Alex| 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
|      Bob| 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
|     Mark| 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
|     Mark| 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
|    Meggy| 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
+----------+--------------------+---------------------+--------+

I'm trying to get such a DataFrame in the final. It must contain all records from the first DataFrame plus a column indicating whether the Person is a member of the organization at the time of the request (REQUEST_DATE) or not.

+---------+-------------------+----------------+
|USER_NAME|       REQUEST_DATE| USER_STATUS    |
+---------+-------------------+----------------+
|     Mark|2018-02-20 00:00:00| Our user       |
|     Alex|2018-03-01 00:00:00| Not our user   |
|      Bob|2018-03-01 00:00:00| Our user       |
|     Mark|2018-07-01 00:00:00| Our user       |
|     Kate|2018-07-01 00:00:00| No Information |
+---------+-------------------+----------------+

I tried next code, but in finalDF I have error:

org.apache.spark.SparkException: Task not serializable

Also in final result I need datetime. Right now in lastRowByRequestId I have only date without time.

CODE:

val df1 = Seq(
    ("Mark", "2018-02-20 00:00:00"),
    ("Alex", "2018-03-01 00:00:00"),
    ("Bob", "2018-03-01 00:00:00"),
    ("Mark", "2018-07-01 00:00:00"),
    ("Kate", "2018-07-01 00:00:00")
).toDF("USER_NAME", "REQUEST_DATE")

df1.show()

val df2 = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df2.show()

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

case class UserAndRequest(
                           USER_NAME:String,
                           REQUEST_DATE:java.sql.Date,
                           START_DATE:java.sql.Date,
                           END_DATE:java.sql.Date,
                           STATUS:String,
                           REQUEST_ID:Long
                         )

val joined : Dataset[UserAndRequest] = df1.withColumn("REQUEST_ID", monotonically_increasing_id).
  join(df2,$"USER_NAME" === $"NAME", "left").
  as[UserAndRequest]

val lastRowByRequestId = joined.
  groupByKey(_.REQUEST_ID).
  reduceGroups( (x,y) =>
    if (x.REQUEST_DATE.getTime > x.END_DATE.getTime && x.END_DATE.getTime > y.END_DATE.getTime) x else y
  ).map(_._2)

def logic(status: String): String = {
  if (status == "IN") "Our user"
  else if (status == "OUT") "not our user"
  else "No Information"
}

val logicUDF = udf(logic _)

val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"REQUEST_DATE"))

Upvotes: 0

Views: 176

Answers (1)

Moustafa Mahmoud
Moustafa Mahmoud

Reputation: 1590

I checked your code and run it. It works with minor update. I replaced REQUEST_DATE By STATUS. Also, Note: Spark not serialized task most cases happened when you don't use case class but from Spark 2.x case classes is encoded automatically in Spark tasks.

val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"STATUS"))

Below is the output

+---------+------------+----------+----------+------+----------+--------------+
|USER_NAME|REQUEST_DATE|START_DATE|  END_DATE|STATUS|REQUEST_ID|   USER_STATUS|
+---------+------------+----------+----------+------+----------+--------------+
|     Mark|  2018-02-20|2018-02-01|2018-03-01|    IN|         0|      Our user|
|     Alex|  2018-03-01|2018-01-01|2018-02-01|   OUT|         1|  not our user|
|     Mark|  2018-07-01|2018-02-01|2018-03-01|    IN|         3|      Our user|
|      Bob|  2018-03-01|2018-02-01|2018-02-05|    IN|         2|      Our user|
|     Kate|  2018-07-01|      null|      null|  null|         4|No Information|
+---------+------------+----------+----------+------+----------+--------------+

Upvotes: 5

Related Questions