Chemssii
Chemssii

Reputation: 11

pyspark join two Dataframe and keep row by the recent date

I have two Dataframes A and B.

A

+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|    5|2018-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

B

+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

and I must create a new Dataframe where the score is updated by looking the date

result

+---+------+-----+----------+
|id |player|score|date      |
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

Upvotes: 0

Views: 2052

Answers (2)

pault
pault

Reputation: 43494

You can join the two dataframes, and use pyspark.sql.functions.when() to pick the values for the score and date columns.

from pyspark.sql.functions import col, when

df_A.alias("a").join(df_B.alias("b"), on=["id", "player"], how="inner")\
    .select(
        "id", 
        "player", 
        when(
            col("b.date") > col("a.date"), 
            col("b.score")
        ).otherwise(col("a.score")).alias("score"),
        when(
            col("b.date") > col("a.date"), 
            col("b.date")
        ).otherwise(col("a.date")).alias("date")
    )\
    .show()
#+---+------+-----+----------+
#| id|player|score|      date|
#+---+------+-----+----------+
#|  1| alpha|  100|2019-02-13|
#|  2|  beta|    6|2018-02-13|
#+---+------+-----+----------+

Read more on when: Spark Equivalent of IF Then ELSE

Upvotes: 1

cph_sto
cph_sto

Reputation: 7585

I am making an assumption that every player is allocated an id and it doesn't change. OP wants that the resulting dataframe should contain the score from the most current date.

# Creating both the DataFrames.
df_A = sqlContext.createDataFrame([(1,'alpha',5,'2018-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_A = df_A.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))

df_B = sqlContext.createDataFrame([(1,'alpha',100,'2019-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_B = df_B.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))

The idea is to make a union(), of these two dataframes and then take the distinct rows. The reason behind taking distinct rows afterwards is the following - Suppose there was no update for a player, then in the B dataframe, it's corresponding values will be the same as in dataframe A. So, we remove such duplicates.

# Importing the requisite packages.
from pyspark.sql.functions import col, max
from pyspark.sql import Window
df = df_A.union(df_B).distinct()
df.show()
+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|    5|2018-02-13|
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

Now, as a final step, use Window() function to loop over the unioned dataframe df and find the latestDate and filter out only those rows where the date is same as the latestDate. That way, all those rows corresponding to those players will be removed where there was an update (manifested by an updated date in dataframe B).

w = Window.partitionBy('id','player')
df = df.withColumn('latestDate', max('date').over(w))\
       .where(col('date') == col('latestDate')).drop('latestDate')
df.show()
+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

Upvotes: 0

Related Questions