MLstudent
MLstudent

Reputation: 89

Compare two columns from different DF, Spark scala

I am trying to compare two columns, every column from different DF. I have these two DF:

df1
+----+-------+-------+
|Game|rev_1_t|rev_2_t|
+----+-------+-------+
|  CA|    AA |    AA |
|  FT|    B  |    C  |
+----+-------+-------+

df_prev
+----+-------+-------+
|Game|rev_1_t|rev_2_t|
+----+-------+-------+
|  CA|    C  |   AA  |  
|  FT|    B  |   C   |
+----+-------+-------+

I want to compare rev_1_t from df1 with rev_1_t from df_prev, and if there is a change put a new column called change with "Y" and "N" if there is not change. At the same time, I want to add a new column called prev_value where I store the previous value of rev_1_t that is in df_prev. Same for rev_2_t. The output would be:

Output:
+----+-------+--------+------------+---------+----------+--------------+
|Game|rev_1_t| change | prev_value | rev_2_t | change_2 | prev_value_2 | 
+----+-------+--------+------------+---------+----------+--------------+
|  CA|    C  |   Y    |      C     |     AA  |   Y      |      C       |
|  FT|    B  |   Y    |      B     |     C   |   Y      |      B       |
+----+-------+--------+------------+---------+----------+--------------+

I am trying to do as you can see here but I am having different errors:

val change = df1.withColumn(
   "change", when (df1("rev_1_t") === df_prev("rev_1_t"), df1("rev_1_t")).otherwise(df_prev("rev_1_t"))
  .withColumn(
   "prev_value", when(df1("rev_1_t") === df_prev("rev_1_t"), "N").otherwise("Y"))

Upvotes: 1

Views: 807

Answers (1)

mck
mck

Reputation: 42352

You can do a join and then compare the relevant columns:

import org.apache.spark.sql.expressions.Window

val result = df1.join(df_prev, Seq("Game"), "left")
    .select(col("Game"), 
            df1("rev_1_t"), 
            when(df1("rev_1_t") === df_prev("rev_1_t"), "N").otherwise("Y").as("change"), 
            df_prev("rev_1_t").as("prev_value"), 
            df1("rev_2_t"), 
            when(df1("rev_2_t") === df_prev("rev_2_t"), "N").otherwise("Y").as("change_2"), 
            df_prev("rev_2_t").as("prev_value_2")
    )
    .withColumn("change", max("change").over(Window.orderBy(lit(1))))
    .withColumn("change_2", max("change_2").over(Window.orderBy(lit(1))))

result.show
+----+-------+------+----------+-------+--------+------------+
|Game|rev_1_t|change|prev_value|rev_2_t|change_2|prev_value_2|
+----+-------+------+----------+-------+--------+------------+
|  CA|     AA|     Y|         C|     AA|       N|          AA|
|  FT|      B|     Y|         B|      C|       N|           C|
+----+-------+------+----------+-------+--------+------------+

Upvotes: 1

Related Questions