Reputation: 11
I have two Dataframes A and B.
A
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 5|2018-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
B
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
and I must create a new Dataframe where the score is updated by looking the date
result
+---+------+-----+----------+
|id |player|score|date |
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
Upvotes: 0
Views: 2052
Reputation: 43494
You can join the two dataframes, and use pyspark.sql.functions.when()
to pick the values for the score
and date
columns.
from pyspark.sql.functions import col, when
df_A.alias("a").join(df_B.alias("b"), on=["id", "player"], how="inner")\
.select(
"id",
"player",
when(
col("b.date") > col("a.date"),
col("b.score")
).otherwise(col("a.score")).alias("score"),
when(
col("b.date") > col("a.date"),
col("b.date")
).otherwise(col("a.date")).alias("date")
)\
.show()
#+---+------+-----+----------+
#| id|player|score| date|
#+---+------+-----+----------+
#| 1| alpha| 100|2019-02-13|
#| 2| beta| 6|2018-02-13|
#+---+------+-----+----------+
Read more on when
: Spark Equivalent of IF Then ELSE
Upvotes: 1
Reputation: 7585
I am making an assumption that every player
is allocated an id
and it doesn't change. OP wants that the resulting dataframe should contain the score
from the most current date
.
# Creating both the DataFrames.
df_A = sqlContext.createDataFrame([(1,'alpha',5,'2018-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_A = df_A.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df_B = sqlContext.createDataFrame([(1,'alpha',100,'2019-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_B = df_B.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
The idea is to make a union(), of these two dataframes and then take the distinct
rows. The reason behind taking distinct
rows afterwards is the following - Suppose there was no update for a player
, then in the B
dataframe, it's corresponding values will be the same as in dataframe A
. So, we remove such duplicates
.
# Importing the requisite packages.
from pyspark.sql.functions import col, max
from pyspark.sql import Window
df = df_A.union(df_B).distinct()
df.show()
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 5|2018-02-13|
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
Now, as a final step, use Window() function to loop over the unioned dataframe df
and find the latestDate
and filter out only those rows where the date
is same as the latestDate
. That way, all those rows corresponding to those players
will be removed where there was an update (manifested by an updated date in dataframe B
).
w = Window.partitionBy('id','player')
df = df.withColumn('latestDate', max('date').over(w))\
.where(col('date') == col('latestDate')).drop('latestDate')
df.show()
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
Upvotes: 0